PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth(). For example, to get the SELECT statement:
61  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
64 {
65  /* SQL statement to execute remotely (as a String node) */
67  /* Integer list of attribute numbers retrieved by the SELECT */
69  /* Integer representing the desired fetch_size */
71 
72  /*
73  * String describing join i.e. names of relations being joined and types
74  * of join, added when the scan is join
75  */
77 };
78 
79 /*
80  * Similarly, this enum describes what's kept in the fdw_private list for
81  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
82  *
83  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
84  * 2) Integer list of target attribute numbers for INSERT/UPDATE
85  * (NIL for a DELETE)
86  * 3) Boolean flag showing if the remote query has a RETURNING clause
87  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
88  */
90 {
91  /* SQL statement to execute remotely (as a String node) */
93  /* Integer list of target attribute numbers for INSERT/UPDATE */
95  /* has-returning flag (as an integer Value node) */
97  /* Integer list of attribute numbers retrieved by RETURNING */
99 };
100 
101 /*
102  * Similarly, this enum describes what's kept in the fdw_private list for
103  * a ForeignScan node that modifies a foreign table directly. We store:
104  *
105  * 1) UPDATE/DELETE statement text to be sent to the remote server
106  * 2) Boolean flag showing if the remote query has a RETURNING clause
107  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
108  * 4) Boolean flag showing if we set the command es_processed
109  */
111 {
112  /* SQL statement to execute remotely (as a String node) */
114  /* has-returning flag (as an integer Value node) */
116  /* Integer list of attribute numbers retrieved by RETURNING */
118  /* set-processed flag (as an integer Value node) */
120 };
121 
122 /*
123  * Execution state of a foreign scan using postgres_fdw.
124  */
125 typedef struct PgFdwScanState
126 {
127  Relation rel; /* relcache entry for the foreign table. NULL
128  * for a foreign join scan. */
129  TupleDesc tupdesc; /* tuple descriptor of scan */
130  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
131 
132  /* extracted fdw_private data */
133  char *query; /* text of SELECT command */
134  List *retrieved_attrs; /* list of retrieved attribute numbers */
135 
136  /* for remote query execution */
137  PGconn *conn; /* connection for the scan */
138  unsigned int cursor_number; /* quasi-unique ID for my cursor */
139  bool cursor_exists; /* have we created the cursor? */
140  int numParams; /* number of parameters passed to query */
141  FmgrInfo *param_flinfo; /* output conversion functions for them */
142  List *param_exprs; /* executable expressions for param values */
143  const char **param_values; /* textual values of query parameters */
144 
145  /* for storing result tuples */
146  HeapTuple *tuples; /* array of currently-retrieved tuples */
147  int num_tuples; /* # of tuples in array */
148  int next_tuple; /* index of next one to return */
149 
150  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
151  int fetch_ct_2; /* Min(# of fetches done, 2) */
152  bool eof_reached; /* true if last fetch reached EOF */
153 
154  /* working memory contexts */
155  MemoryContext batch_cxt; /* context holding current batch of tuples */
156  MemoryContext temp_cxt; /* context for per-tuple temporary data */
157 
158  int fetch_size; /* number of tuples per fetch */
160 
161 /*
162  * Execution state of a foreign insert/update/delete operation.
163  */
164 typedef struct PgFdwModifyState
165 {
166  Relation rel; /* relcache entry for the foreign table */
167  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
168 
169  /* for remote query execution */
170  PGconn *conn; /* connection for the scan */
171  char *p_name; /* name of prepared statement, if created */
172 
173  /* extracted fdw_private data */
174  char *query; /* text of INSERT/UPDATE/DELETE command */
175  List *target_attrs; /* list of target attribute numbers */
176  bool has_returning; /* is there a RETURNING clause? */
177  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
178 
179  /* info about parameters for prepared statement */
180  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
181  int p_nums; /* number of parameters to transmit */
182  FmgrInfo *p_flinfo; /* output conversion functions for them */
183 
184  /* working memory context */
185  MemoryContext temp_cxt; /* context for per-tuple temporary data */
187 
188 /*
189  * Execution state of a foreign scan that modifies a foreign table directly.
190  */
192 {
193  Relation rel; /* relcache entry for the foreign table */
194  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
195 
196  /* extracted fdw_private data */
197  char *query; /* text of UPDATE/DELETE command */
198  bool has_returning; /* is there a RETURNING clause? */
199  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200  bool set_processed; /* do we set the command es_processed? */
201 
202  /* for remote query execution */
203  PGconn *conn; /* connection for the update */
204  int numParams; /* number of parameters passed to query */
205  FmgrInfo *param_flinfo; /* output conversion functions for them */
206  List *param_exprs; /* executable expressions for param values */
207  const char **param_values; /* textual values of query parameters */
208 
209  /* for storing result tuples */
210  PGresult *result; /* result for query */
211  int num_tuples; /* # of result tuples */
212  int next_tuple; /* index of next one to return */
213 
214  /* working memory context */
215  MemoryContext temp_cxt; /* context for per-tuple temporary data */
217 
218 /*
219  * Workspace for analyzing a foreign table.
220  */
221 typedef struct PgFdwAnalyzeState
222 {
223  Relation rel; /* relcache entry for the foreign table */
224  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
225  List *retrieved_attrs; /* attr numbers retrieved by query */
226 
227  /* collected sample rows */
228  HeapTuple *rows; /* array of size targrows */
229  int targrows; /* target # of sample rows */
230  int numrows; /* # of sample rows collected */
231 
232  /* for random sampling */
233  double samplerows; /* # of rows fetched */
234  double rowstoskip; /* # of rows to skip before next sample */
235  ReservoirStateData rstate; /* state for reservoir sampling */
236 
237  /* working memory contexts */
238  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
239  MemoryContext temp_cxt; /* context for per-tuple temporary data */
241 
242 /*
243  * Identify the attribute where data conversion fails.
244  */
245 typedef struct ConversionLocation
246 {
247  Relation rel; /* foreign table's relcache entry. */
248  AttrNumber cur_attno; /* attribute number being processed, or 0 */
249 
250  /*
251  * In case of foreign join push down, fdw_scan_tlist is used to identify
252  * the Var node corresponding to the error location and
253  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
254  * to get the relation name and attribute name.
255  */
258 
259 /* Callback argument for ec_member_matches_foreign */
260 typedef struct
261 {
262  Expr *current; /* current expr, or NULL if not yet found */
263  List *already_used; /* expressions already dealt with */
265 
266 /*
267  * SQL functions
268  */
270 
271 /*
272  * FDW callback routines
273  */
274 static void postgresGetForeignRelSize(PlannerInfo *root,
275  RelOptInfo *baserel,
276  Oid foreigntableid);
277 static void postgresGetForeignPaths(PlannerInfo *root,
278  RelOptInfo *baserel,
279  Oid foreigntableid);
281  RelOptInfo *foreignrel,
282  Oid foreigntableid,
283  ForeignPath *best_path,
284  List *tlist,
285  List *scan_clauses,
286  Plan *outer_plan);
287 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
290 static void postgresEndForeignScan(ForeignScanState *node);
291 static void postgresAddForeignUpdateTargets(Query *parsetree,
292  RangeTblEntry *target_rte,
293  Relation target_relation);
295  ModifyTable *plan,
296  Index resultRelation,
297  int subplan_index);
298 static void postgresBeginForeignModify(ModifyTableState *mtstate,
299  ResultRelInfo *resultRelInfo,
300  List *fdw_private,
301  int subplan_index,
302  int eflags);
304  ResultRelInfo *resultRelInfo,
305  TupleTableSlot *slot,
306  TupleTableSlot *planSlot);
308  ResultRelInfo *resultRelInfo,
309  TupleTableSlot *slot,
310  TupleTableSlot *planSlot);
312  ResultRelInfo *resultRelInfo,
313  TupleTableSlot *slot,
314  TupleTableSlot *planSlot);
315 static void postgresEndForeignModify(EState *estate,
316  ResultRelInfo *resultRelInfo);
318 static bool postgresPlanDirectModify(PlannerInfo *root,
319  ModifyTable *plan,
320  Index resultRelation,
321  int subplan_index);
322 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
324 static void postgresEndDirectModify(ForeignScanState *node);
326  ExplainState *es);
328  ResultRelInfo *rinfo,
329  List *fdw_private,
330  int subplan_index,
331  ExplainState *es);
333  ExplainState *es);
334 static bool postgresAnalyzeForeignTable(Relation relation,
335  AcquireSampleRowsFunc *func,
336  BlockNumber *totalpages);
338  Oid serverOid);
339 static void postgresGetForeignJoinPaths(PlannerInfo *root,
340  RelOptInfo *joinrel,
341  RelOptInfo *outerrel,
342  RelOptInfo *innerrel,
343  JoinType jointype,
344  JoinPathExtraData *extra);
346  TupleTableSlot *slot);
347 static void postgresGetForeignUpperPaths(PlannerInfo *root,
348  UpperRelationKind stage,
349  RelOptInfo *input_rel,
350  RelOptInfo *output_rel);
351 
352 /*
353  * Helper functions
354  */
355 static void estimate_path_cost_size(PlannerInfo *root,
356  RelOptInfo *baserel,
357  List *join_conds,
358  List *pathkeys,
359  double *p_rows, int *p_width,
360  Cost *p_startup_cost, Cost *p_total_cost);
361 static void get_remote_estimate(const char *sql,
362  PGconn *conn,
363  double *rows,
364  int *width,
365  Cost *startup_cost,
366  Cost *total_cost);
369  void *arg);
370 static void create_cursor(ForeignScanState *node);
371 static void fetch_more_data(ForeignScanState *node);
372 static void close_cursor(PGconn *conn, unsigned int cursor_number);
373 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
374 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
375  ItemPointer tupleid,
376  TupleTableSlot *slot);
377 static void store_returning_result(PgFdwModifyState *fmstate,
378  TupleTableSlot *slot, PGresult *res);
379 static void execute_dml_stmt(ForeignScanState *node);
381 static void prepare_query_params(PlanState *node,
382  List *fdw_exprs,
383  int numParams,
385  List **param_exprs,
386  const char ***param_values);
387 static void process_query_params(ExprContext *econtext,
389  List *param_exprs,
390  const char **param_values);
391 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
392  HeapTuple *rows, int targrows,
393  double *totalrows,
394  double *totaldeadrows);
395 static void analyze_row_processor(PGresult *res, int row,
396  PgFdwAnalyzeState *astate);
398  int row,
399  Relation rel,
402  ForeignScanState *fsstate,
403  MemoryContext temp_context);
404 static void conversion_error_callback(void *arg);
405 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
406  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
407  JoinPathExtraData *extra);
408 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
410  RelOptInfo *rel);
413  Path *epq_path);
414 static void add_foreign_grouping_paths(PlannerInfo *root,
415  RelOptInfo *input_rel,
416  RelOptInfo *grouped_rel);
417 static void apply_server_options(PgFdwRelationInfo *fpinfo);
418 static void apply_table_options(PgFdwRelationInfo *fpinfo);
419 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
420  const PgFdwRelationInfo *fpinfo_o,
421  const PgFdwRelationInfo *fpinfo_i);
422 
423 
424 /*
425  * Foreign-data wrapper handler function: return a struct with pointers
426  * to my callback routines.
427  */
428 Datum
430 {
431  FdwRoutine *routine = makeNode(FdwRoutine);
432 
433  /* Functions for scanning foreign tables */
441 
442  /* Functions for updating foreign tables */
455 
456  /* Function for EvalPlanQual rechecks */
458  /* Support functions for EXPLAIN */
462 
463  /* Support functions for ANALYZE */
465 
466  /* Support functions for IMPORT FOREIGN SCHEMA */
468 
469  /* Support functions for join push-down */
471 
472  /* Support functions for upper relation push-down */
474 
475  PG_RETURN_POINTER(routine);
476 }
477 
478 /*
479  * postgresGetForeignRelSize
480  * Estimate # of rows and width of the result of the scan
481  *
482  * We should consider the effect of all baserestrictinfo clauses here, but
483  * not any join clauses.
484  */
485 static void
487  RelOptInfo *baserel,
488  Oid foreigntableid)
489 {
490  PgFdwRelationInfo *fpinfo;
491  ListCell *lc;
492  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
493  const char *namespace;
494  const char *relname;
495  const char *refname;
496 
497  /*
498  * We use PgFdwRelationInfo to pass various information to subsequent
499  * functions.
500  */
501  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
502  baserel->fdw_private = (void *) fpinfo;
503 
504  /* Base foreign tables need to be pushed down always. */
505  fpinfo->pushdown_safe = true;
506 
507  /* Look up foreign-table catalog info. */
508  fpinfo->table = GetForeignTable(foreigntableid);
509  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
510 
511  /*
512  * Extract user-settable option values. Note that per-table setting of
513  * use_remote_estimate overrides per-server setting.
514  */
515  fpinfo->use_remote_estimate = false;
518  fpinfo->shippable_extensions = NIL;
519  fpinfo->fetch_size = 100;
520 
521  apply_server_options(fpinfo);
522  apply_table_options(fpinfo);
523 
524  /*
525  * If the table or the server is configured to use remote estimates,
526  * identify which user to do remote access as during planning. This
527  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
528  * permissions, the query would have failed at runtime anyway.
529  */
530  if (fpinfo->use_remote_estimate)
531  {
532  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
533 
534  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
535  }
536  else
537  fpinfo->user = NULL;
538 
539  /*
540  * Identify which baserestrictinfo clauses can be sent to the remote
541  * server and which can't.
542  */
543  classifyConditions(root, baserel, baserel->baserestrictinfo,
544  &fpinfo->remote_conds, &fpinfo->local_conds);
545 
546  /*
547  * Identify which attributes will need to be retrieved from the remote
548  * server. These include all attrs needed for joins or final output, plus
549  * all attrs used in the local_conds. (Note: if we end up using a
550  * parameterized scan, it's possible that some of the join clauses will be
551  * sent to the remote and thus we wouldn't really need to retrieve the
552  * columns used in them. Doesn't seem worth detecting that case though.)
553  */
554  fpinfo->attrs_used = NULL;
555  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
556  &fpinfo->attrs_used);
557  foreach(lc, fpinfo->local_conds)
558  {
559  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
560 
561  pull_varattnos((Node *) rinfo->clause, baserel->relid,
562  &fpinfo->attrs_used);
563  }
564 
565  /*
566  * Compute the selectivity and cost of the local_conds, so we don't have
567  * to do it over again for each path. The best we can do for these
568  * conditions is to estimate selectivity on the basis of local statistics.
569  */
571  fpinfo->local_conds,
572  baserel->relid,
573  JOIN_INNER,
574  NULL);
575 
576  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
577 
578  /*
579  * Set cached relation costs to some negative value, so that we can detect
580  * when they are set to some sensible costs during one (usually the first)
581  * of the calls to estimate_path_cost_size().
582  */
583  fpinfo->rel_startup_cost = -1;
584  fpinfo->rel_total_cost = -1;
585 
586  /*
587  * If the table or the server is configured to use remote estimates,
588  * connect to the foreign server and execute EXPLAIN to estimate the
589  * number of rows selected by the restriction clauses, as well as the
590  * average row width. Otherwise, estimate using whatever statistics we
591  * have locally, in a way similar to ordinary tables.
592  */
593  if (fpinfo->use_remote_estimate)
594  {
595  /*
596  * Get cost/size estimates with help of remote server. Save the
597  * values in fpinfo so we don't need to do it again to generate the
598  * basic foreign path.
599  */
600  estimate_path_cost_size(root, baserel, NIL, NIL,
601  &fpinfo->rows, &fpinfo->width,
602  &fpinfo->startup_cost, &fpinfo->total_cost);
603 
604  /* Report estimated baserel size to planner. */
605  baserel->rows = fpinfo->rows;
606  baserel->reltarget->width = fpinfo->width;
607  }
608  else
609  {
610  /*
611  * If the foreign table has never been ANALYZEd, it will have relpages
612  * and reltuples equal to zero, which most likely has nothing to do
613  * with reality. We can't do a whole lot about that if we're not
614  * allowed to consult the remote server, but we can use a hack similar
615  * to plancat.c's treatment of empty relations: use a minimum size
616  * estimate of 10 pages, and divide by the column-datatype-based width
617  * estimate to get the corresponding number of tuples.
618  */
619  if (baserel->pages == 0 && baserel->tuples == 0)
620  {
621  baserel->pages = 10;
622  baserel->tuples =
623  (10 * BLCKSZ) / (baserel->reltarget->width +
625  }
626 
627  /* Estimate baserel size as best we can with local statistics. */
628  set_baserel_size_estimates(root, baserel);
629 
630  /* Fill in basically-bogus cost estimates for use later. */
631  estimate_path_cost_size(root, baserel, NIL, NIL,
632  &fpinfo->rows, &fpinfo->width,
633  &fpinfo->startup_cost, &fpinfo->total_cost);
634  }
635 
636  /*
637  * Set the name of relation in fpinfo, while we are constructing it here.
638  * It will be used to build the string describing the join relation in
639  * EXPLAIN output. We can't know whether VERBOSE option is specified or
640  * not, so always schema-qualify the foreign table name.
641  */
642  fpinfo->relation_name = makeStringInfo();
643  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
644  relname = get_rel_name(foreigntableid);
645  refname = rte->eref->aliasname;
646  appendStringInfo(fpinfo->relation_name, "%s.%s",
647  quote_identifier(namespace),
648  quote_identifier(relname));
649  if (*refname && strcmp(refname, relname) != 0)
650  appendStringInfo(fpinfo->relation_name, " %s",
652 
653  /* No outer and inner relations. */
654  fpinfo->make_outerrel_subquery = false;
655  fpinfo->make_innerrel_subquery = false;
656  fpinfo->lower_subquery_rels = NULL;
657  /* Set the relation index. */
658  fpinfo->relation_index = baserel->relid;
659 }
660 
661 /*
662  * get_useful_ecs_for_relation
663  * Determine which EquivalenceClasses might be involved in useful
664  * orderings of this relation.
665  *
666  * This function is in some respects a mirror image of the core function
667  * pathkeys_useful_for_merging: for a regular table, we know what indexes
668  * we have and want to test whether any of them are useful. For a foreign
669  * table, we don't know what indexes are present on the remote side but
670  * want to speculate about which ones we'd like to use if they existed.
671  *
672  * This function returns a list of potentially-useful equivalence classes,
673  * but it does not guarantee that an EquivalenceMember exists which contains
674  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
675  * ft1.x + t1.x = 0, this function will say that the equivalence class
676  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
677  * t1 is local (or on a different server), it will turn out that no useful
678  * ORDER BY clause can be generated. It's not our job to figure that out
679  * here; we're only interested in identifying relevant ECs.
680  */
681 static List *
683 {
684  List *useful_eclass_list = NIL;
685  ListCell *lc;
686  Relids relids;
687 
688  /*
689  * First, consider whether any active EC is potentially useful for a merge
690  * join against this relation.
691  */
692  if (rel->has_eclass_joins)
693  {
694  foreach(lc, root->eq_classes)
695  {
696  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
697 
698  if (eclass_useful_for_merging(root, cur_ec, rel))
699  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
700  }
701  }
702 
703  /*
704  * Next, consider whether there are any non-EC derivable join clauses that
705  * are merge-joinable. If the joininfo list is empty, we can exit
706  * quickly.
707  */
708  if (rel->joininfo == NIL)
709  return useful_eclass_list;
710 
711  /* If this is a child rel, we must use the topmost parent rel to search. */
712  if (IS_OTHER_REL(rel))
713  {
715  relids = rel->top_parent_relids;
716  }
717  else
718  relids = rel->relids;
719 
720  /* Check each join clause in turn. */
721  foreach(lc, rel->joininfo)
722  {
723  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
724 
725  /* Consider only mergejoinable clauses */
726  if (restrictinfo->mergeopfamilies == NIL)
727  continue;
728 
729  /* Make sure we've got canonical ECs. */
730  update_mergeclause_eclasses(root, restrictinfo);
731 
732  /*
733  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
734  * that left_ec and right_ec will be initialized, per comments in
735  * distribute_qual_to_rels.
736  *
737  * We want to identify which side of this merge-joinable clause
738  * contains columns from the relation produced by this RelOptInfo. We
739  * test for overlap, not containment, because there could be extra
740  * relations on either side. For example, suppose we've got something
741  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
742  * A.y = D.y. The input rel might be the joinrel between A and B, and
743  * we'll consider the join clause A.y = D.y. relids contains a
744  * relation not involved in the join class (B) and the equivalence
745  * class for the left-hand side of the clause contains a relation not
746  * involved in the input rel (C). Despite the fact that we have only
747  * overlap and not containment in either direction, A.y is potentially
748  * useful as a sort column.
749  *
750  * Note that it's even possible that relids overlaps neither side of
751  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
752  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
753  * but overlaps neither side of B. In that case, we just skip this
754  * join clause, since it doesn't suggest a useful sort order for this
755  * relation.
756  */
757  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
758  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
759  restrictinfo->right_ec);
760  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
761  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
762  restrictinfo->left_ec);
763  }
764 
765  return useful_eclass_list;
766 }
767 
768 /*
769  * get_useful_pathkeys_for_relation
770  * Determine which orderings of a relation might be useful.
771  *
772  * Getting data in sorted order can be useful either because the requested
773  * order matches the final output ordering for the overall query we're
774  * planning, or because it enables an efficient merge join. Here, we try
775  * to figure out which pathkeys to consider.
776  */
777 static List *
779 {
780  List *useful_pathkeys_list = NIL;
781  List *useful_eclass_list;
783  EquivalenceClass *query_ec = NULL;
784  ListCell *lc;
785 
786  /*
787  * Pushing the query_pathkeys to the remote server is always worth
788  * considering, because it might let us avoid a local sort.
789  */
790  if (root->query_pathkeys)
791  {
792  bool query_pathkeys_ok = true;
793 
794  foreach(lc, root->query_pathkeys)
795  {
796  PathKey *pathkey = (PathKey *) lfirst(lc);
797  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
798  Expr *em_expr;
799 
800  /*
801  * The planner and executor don't have any clever strategy for
802  * taking data sorted by a prefix of the query's pathkeys and
803  * getting it to be sorted by all of those pathkeys. We'll just
804  * end up resorting the entire data set. So, unless we can push
805  * down all of the query pathkeys, forget it.
806  *
807  * is_foreign_expr would detect volatile expressions as well, but
808  * checking ec_has_volatile here saves some cycles.
809  */
810  if (pathkey_ec->ec_has_volatile ||
811  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
812  !is_foreign_expr(root, rel, em_expr))
813  {
814  query_pathkeys_ok = false;
815  break;
816  }
817  }
818 
819  if (query_pathkeys_ok)
820  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
821  }
822 
823  /*
824  * Even if we're not using remote estimates, having the remote side do the
825  * sort generally won't be any worse than doing it locally, and it might
826  * be much better if the remote side can generate data in the right order
827  * without needing a sort at all. However, what we're going to do next is
828  * try to generate pathkeys that seem promising for possible merge joins,
829  * and that's more speculative. A wrong choice might hurt quite a bit, so
830  * bail out if we can't use remote estimates.
831  */
832  if (!fpinfo->use_remote_estimate)
833  return useful_pathkeys_list;
834 
835  /* Get the list of interesting EquivalenceClasses. */
836  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
837 
838  /* Extract unique EC for query, if any, so we don't consider it again. */
839  if (list_length(root->query_pathkeys) == 1)
840  {
841  PathKey *query_pathkey = linitial(root->query_pathkeys);
842 
843  query_ec = query_pathkey->pk_eclass;
844  }
845 
846  /*
847  * As a heuristic, the only pathkeys we consider here are those of length
848  * one. It's surely possible to consider more, but since each one we
849  * choose to consider will generate a round-trip to the remote side, we
850  * need to be a bit cautious here. It would sure be nice to have a local
851  * cache of information about remote index definitions...
852  */
853  foreach(lc, useful_eclass_list)
854  {
855  EquivalenceClass *cur_ec = lfirst(lc);
856  Expr *em_expr;
857  PathKey *pathkey;
858 
859  /* If redundant with what we did above, skip it. */
860  if (cur_ec == query_ec)
861  continue;
862 
863  /* If no pushable expression for this rel, skip it. */
864  em_expr = find_em_expr_for_rel(cur_ec, rel);
865  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
866  continue;
867 
868  /* Looks like we can generate a pathkey, so let's do it. */
869  pathkey = make_canonical_pathkey(root, cur_ec,
870  linitial_oid(cur_ec->ec_opfamilies),
872  false);
873  useful_pathkeys_list = lappend(useful_pathkeys_list,
874  list_make1(pathkey));
875  }
876 
877  return useful_pathkeys_list;
878 }
879 
880 /*
881  * postgresGetForeignPaths
882  * Create possible scan paths for a scan on the foreign table
883  */
884 static void
886  RelOptInfo *baserel,
887  Oid foreigntableid)
888 {
889  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
890  ForeignPath *path;
891  List *ppi_list;
892  ListCell *lc;
893 
894  /*
895  * Create simplest ForeignScan path node and add it to baserel. This path
896  * corresponds to SeqScan path of regular tables (though depending on what
897  * baserestrict conditions we were able to send to remote, there might
898  * actually be an indexscan happening there). We already did all the work
899  * to estimate cost and size of this path.
900  */
901  path = create_foreignscan_path(root, baserel,
902  NULL, /* default pathtarget */
903  fpinfo->rows,
904  fpinfo->startup_cost,
905  fpinfo->total_cost,
906  NIL, /* no pathkeys */
907  NULL, /* no outer rel either */
908  NULL, /* no extra plan */
909  NIL); /* no fdw_private list */
910  add_path(baserel, (Path *) path);
911 
912  /* Add paths with pathkeys */
913  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
914 
915  /*
916  * If we're not using remote estimates, stop here. We have no way to
917  * estimate whether any join clauses would be worth sending across, so
918  * don't bother building parameterized paths.
919  */
920  if (!fpinfo->use_remote_estimate)
921  return;
922 
923  /*
924  * Thumb through all join clauses for the rel to identify which outer
925  * relations could supply one or more safe-to-send-to-remote join clauses.
926  * We'll build a parameterized path for each such outer relation.
927  *
928  * It's convenient to manage this by representing each candidate outer
929  * relation by the ParamPathInfo node for it. We can then use the
930  * ppi_clauses list in the ParamPathInfo node directly as a list of the
931  * interesting join clauses for that rel. This takes care of the
932  * possibility that there are multiple safe join clauses for such a rel,
933  * and also ensures that we account for unsafe join clauses that we'll
934  * still have to enforce locally (since the parameterized-path machinery
935  * insists that we handle all movable clauses).
936  */
937  ppi_list = NIL;
938  foreach(lc, baserel->joininfo)
939  {
940  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
941  Relids required_outer;
942  ParamPathInfo *param_info;
943 
944  /* Check if clause can be moved to this rel */
945  if (!join_clause_is_movable_to(rinfo, baserel))
946  continue;
947 
948  /* See if it is safe to send to remote */
949  if (!is_foreign_expr(root, baserel, rinfo->clause))
950  continue;
951 
952  /* Calculate required outer rels for the resulting path */
953  required_outer = bms_union(rinfo->clause_relids,
954  baserel->lateral_relids);
955  /* We do not want the foreign rel itself listed in required_outer */
956  required_outer = bms_del_member(required_outer, baserel->relid);
957 
958  /*
959  * required_outer probably can't be empty here, but if it were, we
960  * couldn't make a parameterized path.
961  */
962  if (bms_is_empty(required_outer))
963  continue;
964 
965  /* Get the ParamPathInfo */
966  param_info = get_baserel_parampathinfo(root, baserel,
967  required_outer);
968  Assert(param_info != NULL);
969 
970  /*
971  * Add it to list unless we already have it. Testing pointer equality
972  * is OK since get_baserel_parampathinfo won't make duplicates.
973  */
974  ppi_list = list_append_unique_ptr(ppi_list, param_info);
975  }
976 
977  /*
978  * The above scan examined only "generic" join clauses, not those that
979  * were absorbed into EquivalenceClauses. See if we can make anything out
980  * of EquivalenceClauses.
981  */
982  if (baserel->has_eclass_joins)
983  {
984  /*
985  * We repeatedly scan the eclass list looking for column references
986  * (or expressions) belonging to the foreign rel. Each time we find
987  * one, we generate a list of equivalence joinclauses for it, and then
988  * see if any are safe to send to the remote. Repeat till there are
989  * no more candidate EC members.
990  */
992 
993  arg.already_used = NIL;
994  for (;;)
995  {
996  List *clauses;
997 
998  /* Make clauses, skipping any that join to lateral_referencers */
999  arg.current = NULL;
1001  baserel,
1003  (void *) &arg,
1004  baserel->lateral_referencers);
1005 
1006  /* Done if there are no more expressions in the foreign rel */
1007  if (arg.current == NULL)
1008  {
1009  Assert(clauses == NIL);
1010  break;
1011  }
1012 
1013  /* Scan the extracted join clauses */
1014  foreach(lc, clauses)
1015  {
1016  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1017  Relids required_outer;
1018  ParamPathInfo *param_info;
1019 
1020  /* Check if clause can be moved to this rel */
1021  if (!join_clause_is_movable_to(rinfo, baserel))
1022  continue;
1023 
1024  /* See if it is safe to send to remote */
1025  if (!is_foreign_expr(root, baserel, rinfo->clause))
1026  continue;
1027 
1028  /* Calculate required outer rels for the resulting path */
1029  required_outer = bms_union(rinfo->clause_relids,
1030  baserel->lateral_relids);
1031  required_outer = bms_del_member(required_outer, baserel->relid);
1032  if (bms_is_empty(required_outer))
1033  continue;
1034 
1035  /* Get the ParamPathInfo */
1036  param_info = get_baserel_parampathinfo(root, baserel,
1037  required_outer);
1038  Assert(param_info != NULL);
1039 
1040  /* Add it to list unless we already have it */
1041  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1042  }
1043 
1044  /* Try again, now ignoring the expression we found this time */
1045  arg.already_used = lappend(arg.already_used, arg.current);
1046  }
1047  }
1048 
1049  /*
1050  * Now build a path for each useful outer relation.
1051  */
1052  foreach(lc, ppi_list)
1053  {
1054  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1055  double rows;
1056  int width;
1057  Cost startup_cost;
1058  Cost total_cost;
1059 
1060  /* Get a cost estimate from the remote */
1061  estimate_path_cost_size(root, baserel,
1062  param_info->ppi_clauses, NIL,
1063  &rows, &width,
1064  &startup_cost, &total_cost);
1065 
1066  /*
1067  * ppi_rows currently won't get looked at by anything, but still we
1068  * may as well ensure that it matches our idea of the rowcount.
1069  */
1070  param_info->ppi_rows = rows;
1071 
1072  /* Make the path */
1073  path = create_foreignscan_path(root, baserel,
1074  NULL, /* default pathtarget */
1075  rows,
1076  startup_cost,
1077  total_cost,
1078  NIL, /* no pathkeys */
1079  param_info->ppi_req_outer,
1080  NULL,
1081  NIL); /* no fdw_private list */
1082  add_path(baserel, (Path *) path);
1083  }
1084 }
1085 
1086 /*
1087  * postgresGetForeignPlan
1088  * Create ForeignScan plan node which implements selected best path
1089  */
1090 static ForeignScan *
1092  RelOptInfo *foreignrel,
1093  Oid foreigntableid,
1094  ForeignPath *best_path,
1095  List *tlist,
1096  List *scan_clauses,
1097  Plan *outer_plan)
1098 {
1099  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1100  Index scan_relid;
1101  List *fdw_private;
1102  List *remote_exprs = NIL;
1103  List *local_exprs = NIL;
1104  List *params_list = NIL;
1105  List *fdw_scan_tlist = NIL;
1106  List *fdw_recheck_quals = NIL;
1108  StringInfoData sql;
1109  ListCell *lc;
1110 
1111  if (IS_SIMPLE_REL(foreignrel))
1112  {
1113  /*
1114  * For base relations, set scan_relid as the relid of the relation.
1115  */
1116  scan_relid = foreignrel->relid;
1117 
1118  /*
1119  * In a base-relation scan, we must apply the given scan_clauses.
1120  *
1121  * Separate the scan_clauses into those that can be executed remotely
1122  * and those that can't. baserestrictinfo clauses that were
1123  * previously determined to be safe or unsafe by classifyConditions
1124  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1125  * else in the scan_clauses list will be a join clause, which we have
1126  * to check for remote-safety.
1127  *
1128  * Note: the join clauses we see here should be the exact same ones
1129  * previously examined by postgresGetForeignPaths. Possibly it'd be
1130  * worth passing forward the classification work done then, rather
1131  * than repeating it here.
1132  *
1133  * This code must match "extract_actual_clauses(scan_clauses, false)"
1134  * except for the additional decision about remote versus local
1135  * execution.
1136  */
1137  foreach(lc, scan_clauses)
1138  {
1139  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1140 
1141  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1142  if (rinfo->pseudoconstant)
1143  continue;
1144 
1145  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1146  remote_exprs = lappend(remote_exprs, rinfo->clause);
1147  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1148  local_exprs = lappend(local_exprs, rinfo->clause);
1149  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1150  remote_exprs = lappend(remote_exprs, rinfo->clause);
1151  else
1152  local_exprs = lappend(local_exprs, rinfo->clause);
1153  }
1154 
1155  /*
1156  * For a base-relation scan, we have to support EPQ recheck, which
1157  * should recheck all the remote quals.
1158  */
1159  fdw_recheck_quals = remote_exprs;
1160  }
1161  else
1162  {
1163  /*
1164  * Join relation or upper relation - set scan_relid to 0.
1165  */
1166  scan_relid = 0;
1167 
1168  /*
1169  * For a join rel, baserestrictinfo is NIL and we are not considering
1170  * parameterization right now, so there should be no scan_clauses for
1171  * a joinrel or an upper rel either.
1172  */
1173  Assert(!scan_clauses);
1174 
1175  /*
1176  * Instead we get the conditions to apply from the fdw_private
1177  * structure.
1178  */
1179  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1180  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1181 
1182  /*
1183  * We leave fdw_recheck_quals empty in this case, since we never need
1184  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1185  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1186  * If we're planning an upperrel (ie, remote grouping or aggregation)
1187  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1188  * allowed, and indeed we *can't* put the remote clauses into
1189  * fdw_recheck_quals because the unaggregated Vars won't be available
1190  * locally.
1191  */
1192 
1193  /* Build the list of columns to be fetched from the foreign server. */
1194  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1195 
1196  /*
1197  * Ensure that the outer plan produces a tuple whose descriptor
1198  * matches our scan tuple slot. This is safe because all scans and
1199  * joins support projection, so we never need to insert a Result node.
1200  * Also, remove the local conditions from outer plan's quals, lest
1201  * they will be evaluated twice, once by the local plan and once by
1202  * the scan.
1203  */
1204  if (outer_plan)
1205  {
1206  ListCell *lc;
1207 
1208  /*
1209  * Right now, we only consider grouping and aggregation beyond
1210  * joins. Queries involving aggregates or grouping do not require
1211  * EPQ mechanism, hence should not have an outer plan here.
1212  */
1213  Assert(!IS_UPPER_REL(foreignrel));
1214 
1215  outer_plan->targetlist = fdw_scan_tlist;
1216 
1217  foreach(lc, local_exprs)
1218  {
1219  Join *join_plan = (Join *) outer_plan;
1220  Node *qual = lfirst(lc);
1221 
1222  outer_plan->qual = list_delete(outer_plan->qual, qual);
1223 
1224  /*
1225  * For an inner join the local conditions of foreign scan plan
1226  * can be part of the joinquals as well.
1227  */
1228  if (join_plan->jointype == JOIN_INNER)
1229  join_plan->joinqual = list_delete(join_plan->joinqual,
1230  qual);
1231  }
1232  }
1233  }
1234 
1235  /*
1236  * Build the query string to be sent for execution, and identify
1237  * expressions to be sent as parameters.
1238  */
1239  initStringInfo(&sql);
1240  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1241  remote_exprs, best_path->path.pathkeys,
1242  false, &retrieved_attrs, &params_list);
1243 
1244  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1245  fpinfo->final_remote_exprs = remote_exprs;
1246 
1247  /*
1248  * Build the fdw_private list that will be available to the executor.
1249  * Items in the list must match order in enum FdwScanPrivateIndex.
1250  */
1251  fdw_private = list_make3(makeString(sql.data),
1253  makeInteger(fpinfo->fetch_size));
1254  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1255  fdw_private = lappend(fdw_private,
1256  makeString(fpinfo->relation_name->data));
1257 
1258  /*
1259  * Create the ForeignScan node for the given relation.
1260  *
1261  * Note that the remote parameter expressions are stored in the fdw_exprs
1262  * field of the finished plan node; we can't keep them in private state
1263  * because then they wouldn't be subject to later planner processing.
1264  */
1265  return make_foreignscan(tlist,
1266  local_exprs,
1267  scan_relid,
1268  params_list,
1269  fdw_private,
1270  fdw_scan_tlist,
1271  fdw_recheck_quals,
1272  outer_plan);
1273 }
1274 
1275 /*
1276  * postgresBeginForeignScan
1277  * Initiate an executor scan of a foreign PostgreSQL table.
1278  */
1279 static void
1281 {
1282  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1283  EState *estate = node->ss.ps.state;
1284  PgFdwScanState *fsstate;
1285  RangeTblEntry *rte;
1286  Oid userid;
1287  ForeignTable *table;
1288  UserMapping *user;
1289  int rtindex;
1290  int numParams;
1291 
1292  /*
1293  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1294  */
1295  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1296  return;
1297 
1298  /*
1299  * We'll save private state in node->fdw_state.
1300  */
1301  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1302  node->fdw_state = (void *) fsstate;
1303 
1304  /*
1305  * Identify which user to do the remote access as. This should match what
1306  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1307  * lowest-numbered member RTE as a representative; we would get the same
1308  * result from any.
1309  */
1310  if (fsplan->scan.scanrelid > 0)
1311  rtindex = fsplan->scan.scanrelid;
1312  else
1313  rtindex = bms_next_member(fsplan->fs_relids, -1);
1314  rte = rt_fetch(rtindex, estate->es_range_table);
1315  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1316 
1317  /* Get info about foreign table. */
1318  table = GetForeignTable(rte->relid);
1319  user = GetUserMapping(userid, table->serverid);
1320 
1321  /*
1322  * Get connection to the foreign server. Connection manager will
1323  * establish new connection if necessary.
1324  */
1325  fsstate->conn = GetConnection(user, false);
1326 
1327  /* Assign a unique ID for my cursor */
1328  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1329  fsstate->cursor_exists = false;
1330 
1331  /* Get private info created by planner functions. */
1332  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1334  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1336  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1338 
1339  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1340  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1341  "postgres_fdw tuple data",
1343  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1344  "postgres_fdw temporary data",
1346 
1347  /*
1348  * Get info we'll need for converting data fetched from the foreign server
1349  * into local representation and error reporting during that process.
1350  */
1351  if (fsplan->scan.scanrelid > 0)
1352  {
1353  fsstate->rel = node->ss.ss_currentRelation;
1354  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1355  }
1356  else
1357  {
1358  fsstate->rel = NULL;
1359  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1360  }
1361 
1362  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1363 
1364  /*
1365  * Prepare for processing of parameters used in remote query, if any.
1366  */
1367  numParams = list_length(fsplan->fdw_exprs);
1368  fsstate->numParams = numParams;
1369  if (numParams > 0)
1371  fsplan->fdw_exprs,
1372  numParams,
1373  &fsstate->param_flinfo,
1374  &fsstate->param_exprs,
1375  &fsstate->param_values);
1376 }
1377 
1378 /*
1379  * postgresIterateForeignScan
1380  * Retrieve next row from the result set, or clear tuple slot to indicate
1381  * EOF.
1382  */
1383 static TupleTableSlot *
1385 {
1386  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1387  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1388 
1389  /*
1390  * If this is the first call after Begin or ReScan, we need to create the
1391  * cursor on the remote side.
1392  */
1393  if (!fsstate->cursor_exists)
1394  create_cursor(node);
1395 
1396  /*
1397  * Get some more tuples, if we've run out.
1398  */
1399  if (fsstate->next_tuple >= fsstate->num_tuples)
1400  {
1401  /* No point in another fetch if we already detected EOF, though. */
1402  if (!fsstate->eof_reached)
1403  fetch_more_data(node);
1404  /* If we didn't get any tuples, must be end of data. */
1405  if (fsstate->next_tuple >= fsstate->num_tuples)
1406  return ExecClearTuple(slot);
1407  }
1408 
1409  /*
1410  * Return the next tuple.
1411  */
1412  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1413  slot,
1414  InvalidBuffer,
1415  false);
1416 
1417  return slot;
1418 }
1419 
1420 /*
1421  * postgresReScanForeignScan
1422  * Restart the scan.
1423  */
1424 static void
1426 {
1427  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1428  char sql[64];
1429  PGresult *res;
1430 
1431  /* If we haven't created the cursor yet, nothing to do. */
1432  if (!fsstate->cursor_exists)
1433  return;
1434 
1435  /*
1436  * If any internal parameters affecting this node have changed, we'd
1437  * better destroy and recreate the cursor. Otherwise, rewinding it should
1438  * be good enough. If we've only fetched zero or one batch, we needn't
1439  * even rewind the cursor, just rescan what we have.
1440  */
1441  if (node->ss.ps.chgParam != NULL)
1442  {
1443  fsstate->cursor_exists = false;
1444  snprintf(sql, sizeof(sql), "CLOSE c%u",
1445  fsstate->cursor_number);
1446  }
1447  else if (fsstate->fetch_ct_2 > 1)
1448  {
1449  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1450  fsstate->cursor_number);
1451  }
1452  else
1453  {
1454  /* Easy: just rescan what we already have in memory, if anything */
1455  fsstate->next_tuple = 0;
1456  return;
1457  }
1458 
1459  /*
1460  * We don't use a PG_TRY block here, so be careful not to throw error
1461  * without releasing the PGresult.
1462  */
1463  res = pgfdw_exec_query(fsstate->conn, sql);
1464  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1465  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1466  PQclear(res);
1467 
1468  /* Now force a fresh FETCH. */
1469  fsstate->tuples = NULL;
1470  fsstate->num_tuples = 0;
1471  fsstate->next_tuple = 0;
1472  fsstate->fetch_ct_2 = 0;
1473  fsstate->eof_reached = false;
1474 }
1475 
1476 /*
1477  * postgresEndForeignScan
1478  * Finish scanning foreign table and dispose objects used for this scan
1479  */
1480 static void
1482 {
1483  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1484 
1485  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1486  if (fsstate == NULL)
1487  return;
1488 
1489  /* Close the cursor if open, to prevent accumulation of cursors */
1490  if (fsstate->cursor_exists)
1491  close_cursor(fsstate->conn, fsstate->cursor_number);
1492 
1493  /* Release remote connection */
1494  ReleaseConnection(fsstate->conn);
1495  fsstate->conn = NULL;
1496 
1497  /* MemoryContexts will be deleted automatically. */
1498 }
1499 
1500 /*
1501  * postgresAddForeignUpdateTargets
1502  * Add resjunk column(s) needed for update/delete on a foreign table
1503  */
1504 static void
1506  RangeTblEntry *target_rte,
1507  Relation target_relation)
1508 {
1509  Var *var;
1510  const char *attrname;
1511  TargetEntry *tle;
1512 
1513  /*
1514  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1515  */
1516 
1517  /* Make a Var representing the desired value */
1518  var = makeVar(parsetree->resultRelation,
1520  TIDOID,
1521  -1,
1522  InvalidOid,
1523  0);
1524 
1525  /* Wrap it in a resjunk TLE with the right name ... */
1526  attrname = "ctid";
1527 
1528  tle = makeTargetEntry((Expr *) var,
1529  list_length(parsetree->targetList) + 1,
1530  pstrdup(attrname),
1531  true);
1532 
1533  /* ... and add it to the query's targetlist */
1534  parsetree->targetList = lappend(parsetree->targetList, tle);
1535 }
1536 
1537 /*
1538  * postgresPlanForeignModify
1539  * Plan an insert/update/delete operation on a foreign table
1540  */
1541 static List *
1543  ModifyTable *plan,
1544  Index resultRelation,
1545  int subplan_index)
1546 {
1547  CmdType operation = plan->operation;
1548  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1549  Relation rel;
1550  StringInfoData sql;
1551  List *targetAttrs = NIL;
1552  List *returningList = NIL;
1554  bool doNothing = false;
1555 
1556  initStringInfo(&sql);
1557 
1558  /*
1559  * Core code already has some lock on each rel being planned, so we can
1560  * use NoLock here.
1561  */
1562  rel = heap_open(rte->relid, NoLock);
1563 
1564  /*
1565  * In an INSERT, we transmit all columns that are defined in the foreign
1566  * table. In an UPDATE, we transmit only columns that were explicitly
1567  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1568  * (We can't do that for INSERT since we would miss sending default values
1569  * for columns not listed in the source statement.)
1570  */
1571  if (operation == CMD_INSERT)
1572  {
1574  int attnum;
1575 
1576  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1577  {
1578  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1579 
1580  if (!attr->attisdropped)
1581  targetAttrs = lappend_int(targetAttrs, attnum);
1582  }
1583  }
1584  else if (operation == CMD_UPDATE)
1585  {
1586  int col;
1587 
1588  col = -1;
1589  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1590  {
1591  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1593 
1594  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1595  elog(ERROR, "system-column update is not supported");
1596  targetAttrs = lappend_int(targetAttrs, attno);
1597  }
1598  }
1599 
1600  /*
1601  * Extract the relevant RETURNING list if any.
1602  */
1603  if (plan->returningLists)
1604  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1605 
1606  /*
1607  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1608  * should have already been rejected in the optimizer, as presently there
1609  * is no way to recognize an arbiter index on a foreign table. Only DO
1610  * NOTHING is supported without an inference specification.
1611  */
1612  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1613  doNothing = true;
1614  else if (plan->onConflictAction != ONCONFLICT_NONE)
1615  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1616  (int) plan->onConflictAction);
1617 
1618  /*
1619  * Construct the SQL command string.
1620  */
1621  switch (operation)
1622  {
1623  case CMD_INSERT:
1624  deparseInsertSql(&sql, root, resultRelation, rel,
1625  targetAttrs, doNothing, returningList,
1626  &retrieved_attrs);
1627  break;
1628  case CMD_UPDATE:
1629  deparseUpdateSql(&sql, root, resultRelation, rel,
1630  targetAttrs, returningList,
1631  &retrieved_attrs);
1632  break;
1633  case CMD_DELETE:
1634  deparseDeleteSql(&sql, root, resultRelation, rel,
1635  returningList,
1636  &retrieved_attrs);
1637  break;
1638  default:
1639  elog(ERROR, "unexpected operation: %d", (int) operation);
1640  break;
1641  }
1642 
1643  heap_close(rel, NoLock);
1644 
1645  /*
1646  * Build the fdw_private list that will be available to the executor.
1647  * Items in the list must match enum FdwModifyPrivateIndex, above.
1648  */
1649  return list_make4(makeString(sql.data),
1650  targetAttrs,
1651  makeInteger((retrieved_attrs != NIL)),
1652  retrieved_attrs);
1653 }
1654 
1655 /*
1656  * postgresBeginForeignModify
1657  * Begin an insert/update/delete operation on a foreign table
1658  */
1659 static void
1661  ResultRelInfo *resultRelInfo,
1662  List *fdw_private,
1663  int subplan_index,
1664  int eflags)
1665 {
1666  PgFdwModifyState *fmstate;
1667  EState *estate = mtstate->ps.state;
1668  CmdType operation = mtstate->operation;
1669  Relation rel = resultRelInfo->ri_RelationDesc;
1670  RangeTblEntry *rte;
1671  Oid userid;
1672  ForeignTable *table;
1673  UserMapping *user;
1674  AttrNumber n_params;
1675  Oid typefnoid;
1676  bool isvarlena;
1677  ListCell *lc;
1679 
1680  /*
1681  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1682  * stays NULL.
1683  */
1684  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1685  return;
1686 
1687  /* Begin constructing PgFdwModifyState. */
1688  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1689  fmstate->rel = rel;
1690 
1691  /*
1692  * Identify which user to do the remote access as. This should match what
1693  * ExecCheckRTEPerms() does.
1694  */
1695  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1696  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1697 
1698  /* Get info about foreign table. */
1699  table = GetForeignTable(RelationGetRelid(rel));
1700  user = GetUserMapping(userid, table->serverid);
1701 
1702  /* Open connection; report that we'll create a prepared statement. */
1703  fmstate->conn = GetConnection(user, true);
1704  fmstate->p_name = NULL; /* prepared statement not made yet */
1705 
1706  /* Deconstruct fdw_private data. */
1707  fmstate->query = strVal(list_nth(fdw_private,
1709  fmstate->target_attrs = (List *) list_nth(fdw_private,
1711  fmstate->has_returning = intVal(list_nth(fdw_private,
1713  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1715 
1716  /* Create context for per-tuple temp workspace. */
1717  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1718  "postgres_fdw temporary data",
1720 
1721  /* Prepare for input conversion of RETURNING results. */
1722  if (fmstate->has_returning)
1723  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1724 
1725  /* Prepare for output conversion of parameters used in prepared stmt. */
1726  n_params = list_length(fmstate->target_attrs) + 1;
1727  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1728  fmstate->p_nums = 0;
1729 
1730  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1731  {
1732  /* Find the ctid resjunk column in the subplan's result */
1733  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1734 
1736  "ctid");
1737  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1738  elog(ERROR, "could not find junk ctid column");
1739 
1740  /* First transmittable parameter will be ctid */
1741  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1742  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1743  fmstate->p_nums++;
1744  }
1745 
1746  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1747  {
1748  /* Set up for remaining transmittable parameters */
1749  foreach(lc, fmstate->target_attrs)
1750  {
1751  int attnum = lfirst_int(lc);
1752  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1753 
1754  Assert(!attr->attisdropped);
1755 
1756  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1757  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1758  fmstate->p_nums++;
1759  }
1760  }
1761 
1762  Assert(fmstate->p_nums <= n_params);
1763 
1764  resultRelInfo->ri_FdwState = fmstate;
1765 }
1766 
1767 /*
1768  * postgresExecForeignInsert
1769  * Insert one row into a foreign table
1770  */
1771 static TupleTableSlot *
1773  ResultRelInfo *resultRelInfo,
1774  TupleTableSlot *slot,
1775  TupleTableSlot *planSlot)
1776 {
1777  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1778  const char **p_values;
1779  PGresult *res;
1780  int n_rows;
1781 
1782  /* Set up the prepared statement on the remote server, if we didn't yet */
1783  if (!fmstate->p_name)
1784  prepare_foreign_modify(fmstate);
1785 
1786  /* Convert parameters needed by prepared statement to text form */
1787  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1788 
1789  /*
1790  * Execute the prepared statement.
1791  */
1792  if (!PQsendQueryPrepared(fmstate->conn,
1793  fmstate->p_name,
1794  fmstate->p_nums,
1795  p_values,
1796  NULL,
1797  NULL,
1798  0))
1799  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1800 
1801  /*
1802  * Get the result, and check for success.
1803  *
1804  * We don't use a PG_TRY block here, so be careful not to throw error
1805  * without releasing the PGresult.
1806  */
1807  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1808  if (PQresultStatus(res) !=
1810  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1811 
1812  /* Check number of rows affected, and fetch RETURNING tuple if any */
1813  if (fmstate->has_returning)
1814  {
1815  n_rows = PQntuples(res);
1816  if (n_rows > 0)
1817  store_returning_result(fmstate, slot, res);
1818  }
1819  else
1820  n_rows = atoi(PQcmdTuples(res));
1821 
1822  /* And clean up */
1823  PQclear(res);
1824 
1825  MemoryContextReset(fmstate->temp_cxt);
1826 
1827  /* Return NULL if nothing was inserted on the remote end */
1828  return (n_rows > 0) ? slot : NULL;
1829 }
1830 
1831 /*
1832  * postgresExecForeignUpdate
1833  * Update one row in a foreign table
1834  */
1835 static TupleTableSlot *
1837  ResultRelInfo *resultRelInfo,
1838  TupleTableSlot *slot,
1839  TupleTableSlot *planSlot)
1840 {
1841  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1842  Datum datum;
1843  bool isNull;
1844  const char **p_values;
1845  PGresult *res;
1846  int n_rows;
1847 
1848  /* Set up the prepared statement on the remote server, if we didn't yet */
1849  if (!fmstate->p_name)
1850  prepare_foreign_modify(fmstate);
1851 
1852  /* Get the ctid that was passed up as a resjunk column */
1853  datum = ExecGetJunkAttribute(planSlot,
1854  fmstate->ctidAttno,
1855  &isNull);
1856  /* shouldn't ever get a null result... */
1857  if (isNull)
1858  elog(ERROR, "ctid is NULL");
1859 
1860  /* Convert parameters needed by prepared statement to text form */
1861  p_values = convert_prep_stmt_params(fmstate,
1862  (ItemPointer) DatumGetPointer(datum),
1863  slot);
1864 
1865  /*
1866  * Execute the prepared statement.
1867  */
1868  if (!PQsendQueryPrepared(fmstate->conn,
1869  fmstate->p_name,
1870  fmstate->p_nums,
1871  p_values,
1872  NULL,
1873  NULL,
1874  0))
1875  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1876 
1877  /*
1878  * Get the result, and check for success.
1879  *
1880  * We don't use a PG_TRY block here, so be careful not to throw error
1881  * without releasing the PGresult.
1882  */
1883  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1884  if (PQresultStatus(res) !=
1886  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1887 
1888  /* Check number of rows affected, and fetch RETURNING tuple if any */
1889  if (fmstate->has_returning)
1890  {
1891  n_rows = PQntuples(res);
1892  if (n_rows > 0)
1893  store_returning_result(fmstate, slot, res);
1894  }
1895  else
1896  n_rows = atoi(PQcmdTuples(res));
1897 
1898  /* And clean up */
1899  PQclear(res);
1900 
1901  MemoryContextReset(fmstate->temp_cxt);
1902 
1903  /* Return NULL if nothing was updated on the remote end */
1904  return (n_rows > 0) ? slot : NULL;
1905 }
1906 
1907 /*
1908  * postgresExecForeignDelete
1909  * Delete one row from a foreign table
1910  */
1911 static TupleTableSlot *
1913  ResultRelInfo *resultRelInfo,
1914  TupleTableSlot *slot,
1915  TupleTableSlot *planSlot)
1916 {
1917  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1918  Datum datum;
1919  bool isNull;
1920  const char **p_values;
1921  PGresult *res;
1922  int n_rows;
1923 
1924  /* Set up the prepared statement on the remote server, if we didn't yet */
1925  if (!fmstate->p_name)
1926  prepare_foreign_modify(fmstate);
1927 
1928  /* Get the ctid that was passed up as a resjunk column */
1929  datum = ExecGetJunkAttribute(planSlot,
1930  fmstate->ctidAttno,
1931  &isNull);
1932  /* shouldn't ever get a null result... */
1933  if (isNull)
1934  elog(ERROR, "ctid is NULL");
1935 
1936  /* Convert parameters needed by prepared statement to text form */
1937  p_values = convert_prep_stmt_params(fmstate,
1938  (ItemPointer) DatumGetPointer(datum),
1939  NULL);
1940 
1941  /*
1942  * Execute the prepared statement.
1943  */
1944  if (!PQsendQueryPrepared(fmstate->conn,
1945  fmstate->p_name,
1946  fmstate->p_nums,
1947  p_values,
1948  NULL,
1949  NULL,
1950  0))
1951  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1952 
1953  /*
1954  * Get the result, and check for success.
1955  *
1956  * We don't use a PG_TRY block here, so be careful not to throw error
1957  * without releasing the PGresult.
1958  */
1959  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1960  if (PQresultStatus(res) !=
1962  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1963 
1964  /* Check number of rows affected, and fetch RETURNING tuple if any */
1965  if (fmstate->has_returning)
1966  {
1967  n_rows = PQntuples(res);
1968  if (n_rows > 0)
1969  store_returning_result(fmstate, slot, res);
1970  }
1971  else
1972  n_rows = atoi(PQcmdTuples(res));
1973 
1974  /* And clean up */
1975  PQclear(res);
1976 
1977  MemoryContextReset(fmstate->temp_cxt);
1978 
1979  /* Return NULL if nothing was deleted on the remote end */
1980  return (n_rows > 0) ? slot : NULL;
1981 }
1982 
1983 /*
1984  * postgresEndForeignModify
1985  * Finish an insert/update/delete operation on a foreign table
1986  */
1987 static void
1989  ResultRelInfo *resultRelInfo)
1990 {
1991  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1992 
1993  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1994  if (fmstate == NULL)
1995  return;
1996 
1997  /* If we created a prepared statement, destroy it */
1998  if (fmstate->p_name)
1999  {
2000  char sql[64];
2001  PGresult *res;
2002 
2003  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2004 
2005  /*
2006  * We don't use a PG_TRY block here, so be careful not to throw error
2007  * without releasing the PGresult.
2008  */
2009  res = pgfdw_exec_query(fmstate->conn, sql);
2010  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2011  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2012  PQclear(res);
2013  fmstate->p_name = NULL;
2014  }
2015 
2016  /* Release remote connection */
2017  ReleaseConnection(fmstate->conn);
2018  fmstate->conn = NULL;
2019 }
2020 
2021 /*
2022  * postgresIsForeignRelUpdatable
2023  * Determine whether a foreign table supports INSERT, UPDATE and/or
2024  * DELETE.
2025  */
2026 static int
2028 {
2029  bool updatable;
2030  ForeignTable *table;
2031  ForeignServer *server;
2032  ListCell *lc;
2033 
2034  /*
2035  * By default, all postgres_fdw foreign tables are assumed updatable. This
2036  * can be overridden by a per-server setting, which in turn can be
2037  * overridden by a per-table setting.
2038  */
2039  updatable = true;
2040 
2041  table = GetForeignTable(RelationGetRelid(rel));
2042  server = GetForeignServer(table->serverid);
2043 
2044  foreach(lc, server->options)
2045  {
2046  DefElem *def = (DefElem *) lfirst(lc);
2047 
2048  if (strcmp(def->defname, "updatable") == 0)
2049  updatable = defGetBoolean(def);
2050  }
2051  foreach(lc, table->options)
2052  {
2053  DefElem *def = (DefElem *) lfirst(lc);
2054 
2055  if (strcmp(def->defname, "updatable") == 0)
2056  updatable = defGetBoolean(def);
2057  }
2058 
2059  /*
2060  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2061  */
2062  return updatable ?
2063  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2064 }
2065 
2066 /*
2067  * postgresRecheckForeignScan
2068  * Execute a local join execution plan for a foreign join
2069  */
2070 static bool
2072 {
2073  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2075  TupleTableSlot *result;
2076 
2077  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2078  if (scanrelid > 0)
2079  return true;
2080 
2081  Assert(outerPlan != NULL);
2082 
2083  /* Execute a local join execution plan */
2084  result = ExecProcNode(outerPlan);
2085  if (TupIsNull(result))
2086  return false;
2087 
2088  /* Store result in the given slot */
2089  ExecCopySlot(slot, result);
2090 
2091  return true;
2092 }
2093 
2094 /*
2095  * postgresPlanDirectModify
2096  * Consider a direct foreign table modification
2097  *
2098  * Decide whether it is safe to modify a foreign table directly, and if so,
2099  * rewrite subplan accordingly.
2100  */
2101 static bool
2103  ModifyTable *plan,
2104  Index resultRelation,
2105  int subplan_index)
2106 {
2107  CmdType operation = plan->operation;
2108  Plan *subplan;
2109  RelOptInfo *foreignrel;
2110  RangeTblEntry *rte;
2111  PgFdwRelationInfo *fpinfo;
2112  Relation rel;
2113  StringInfoData sql;
2114  ForeignScan *fscan;
2115  List *targetAttrs = NIL;
2116  List *remote_exprs;
2117  List *params_list = NIL;
2118  List *returningList = NIL;
2120 
2121  /*
2122  * Decide whether it is safe to modify a foreign table directly.
2123  */
2124 
2125  /*
2126  * The table modification must be an UPDATE or DELETE.
2127  */
2128  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2129  return false;
2130 
2131  /*
2132  * It's unsafe to modify a foreign table directly if there are any local
2133  * joins needed.
2134  */
2135  subplan = (Plan *) list_nth(plan->plans, subplan_index);
2136  if (!IsA(subplan, ForeignScan))
2137  return false;
2138  fscan = (ForeignScan *) subplan;
2139 
2140  /*
2141  * It's unsafe to modify a foreign table directly if there are any quals
2142  * that should be evaluated locally.
2143  */
2144  if (subplan->qual != NIL)
2145  return false;
2146 
2147  /*
2148  * We can't handle an UPDATE or DELETE on a foreign join for now.
2149  */
2150  if (fscan->scan.scanrelid == 0)
2151  return false;
2152 
2153  /* Safe to fetch data about the target foreign rel */
2154  foreignrel = root->simple_rel_array[resultRelation];
2155  rte = root->simple_rte_array[resultRelation];
2156  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2157 
2158  /*
2159  * It's unsafe to update a foreign table directly, if any expressions to
2160  * assign to the target columns are unsafe to evaluate remotely.
2161  */
2162  if (operation == CMD_UPDATE)
2163  {
2164  int col;
2165 
2166  /*
2167  * We transmit only columns that were explicitly targets of the
2168  * UPDATE, so as to avoid unnecessary data transmission.
2169  */
2170  col = -1;
2171  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2172  {
2173  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2175  TargetEntry *tle;
2176 
2177  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2178  elog(ERROR, "system-column update is not supported");
2179 
2180  tle = get_tle_by_resno(subplan->targetlist, attno);
2181 
2182  if (!tle)
2183  elog(ERROR, "attribute number %d not found in subplan targetlist",
2184  attno);
2185 
2186  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2187  return false;
2188 
2189  targetAttrs = lappend_int(targetAttrs, attno);
2190  }
2191  }
2192 
2193  /*
2194  * Ok, rewrite subplan so as to modify the foreign table directly.
2195  */
2196  initStringInfo(&sql);
2197 
2198  /*
2199  * Core code already has some lock on each rel being planned, so we can
2200  * use NoLock here.
2201  */
2202  rel = heap_open(rte->relid, NoLock);
2203 
2204  /*
2205  * Recall the qual clauses that must be evaluated remotely. (These are
2206  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2207  * doesn't care.)
2208  */
2209  remote_exprs = fpinfo->final_remote_exprs;
2210 
2211  /*
2212  * Extract the relevant RETURNING list if any.
2213  */
2214  if (plan->returningLists)
2215  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2216 
2217  /*
2218  * Construct the SQL command string.
2219  */
2220  switch (operation)
2221  {
2222  case CMD_UPDATE:
2223  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2224  ((Plan *) fscan)->targetlist,
2225  targetAttrs,
2226  remote_exprs, &params_list,
2227  returningList, &retrieved_attrs);
2228  break;
2229  case CMD_DELETE:
2230  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2231  remote_exprs, &params_list,
2232  returningList, &retrieved_attrs);
2233  break;
2234  default:
2235  elog(ERROR, "unexpected operation: %d", (int) operation);
2236  break;
2237  }
2238 
2239  /*
2240  * Update the operation info.
2241  */
2242  fscan->operation = operation;
2243 
2244  /*
2245  * Update the fdw_exprs list that will be available to the executor.
2246  */
2247  fscan->fdw_exprs = params_list;
2248 
2249  /*
2250  * Update the fdw_private list that will be available to the executor.
2251  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2252  */
2253  fscan->fdw_private = list_make4(makeString(sql.data),
2254  makeInteger((retrieved_attrs != NIL)),
2255  retrieved_attrs,
2256  makeInteger(plan->canSetTag));
2257 
2258  heap_close(rel, NoLock);
2259  return true;
2260 }
2261 
2262 /*
2263  * postgresBeginDirectModify
2264  * Prepare a direct foreign table modification
2265  */
2266 static void
2268 {
2269  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2270  EState *estate = node->ss.ps.state;
2271  PgFdwDirectModifyState *dmstate;
2272  RangeTblEntry *rte;
2273  Oid userid;
2274  ForeignTable *table;
2275  UserMapping *user;
2276  int numParams;
2277 
2278  /*
2279  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2280  */
2281  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2282  return;
2283 
2284  /*
2285  * We'll save private state in node->fdw_state.
2286  */
2287  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2288  node->fdw_state = (void *) dmstate;
2289 
2290  /*
2291  * Identify which user to do the remote access as. This should match what
2292  * ExecCheckRTEPerms() does.
2293  */
2294  rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2295  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2296 
2297  /* Get info about foreign table. */
2298  dmstate->rel = node->ss.ss_currentRelation;
2299  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2300  user = GetUserMapping(userid, table->serverid);
2301 
2302  /*
2303  * Get connection to the foreign server. Connection manager will
2304  * establish new connection if necessary.
2305  */
2306  dmstate->conn = GetConnection(user, false);
2307 
2308  /* Initialize state variable */
2309  dmstate->num_tuples = -1; /* -1 means not set yet */
2310 
2311  /* Get private info created by planner functions. */
2312  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2314  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2316  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2318  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2320 
2321  /* Create context for per-tuple temp workspace. */
2322  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2323  "postgres_fdw temporary data",
2325 
2326  /* Prepare for input conversion of RETURNING results. */
2327  if (dmstate->has_returning)
2328  dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2329 
2330  /*
2331  * Prepare for processing of parameters used in remote query, if any.
2332  */
2333  numParams = list_length(fsplan->fdw_exprs);
2334  dmstate->numParams = numParams;
2335  if (numParams > 0)
2337  fsplan->fdw_exprs,
2338  numParams,
2339  &dmstate->param_flinfo,
2340  &dmstate->param_exprs,
2341  &dmstate->param_values);
2342 }
2343 
2344 /*
2345  * postgresIterateDirectModify
2346  * Execute a direct foreign table modification
2347  */
2348 static TupleTableSlot *
2350 {
2352  EState *estate = node->ss.ps.state;
2353  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2354 
2355  /*
2356  * If this is the first call after Begin, execute the statement.
2357  */
2358  if (dmstate->num_tuples == -1)
2359  execute_dml_stmt(node);
2360 
2361  /*
2362  * If the local query doesn't specify RETURNING, just clear tuple slot.
2363  */
2364  if (!resultRelInfo->ri_projectReturning)
2365  {
2366  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2367  Instrumentation *instr = node->ss.ps.instrument;
2368 
2369  Assert(!dmstate->has_returning);
2370 
2371  /* Increment the command es_processed count if necessary. */
2372  if (dmstate->set_processed)
2373  estate->es_processed += dmstate->num_tuples;
2374 
2375  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2376  if (instr)
2377  instr->tuplecount += dmstate->num_tuples;
2378 
2379  return ExecClearTuple(slot);
2380  }
2381 
2382  /*
2383  * Get the next RETURNING tuple.
2384  */
2385  return get_returning_data(node);
2386 }
2387 
2388 /*
2389  * postgresEndDirectModify
2390  * Finish a direct foreign table modification
2391  */
2392 static void
2394 {
2396 
2397  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2398  if (dmstate == NULL)
2399  return;
2400 
2401  /* Release PGresult */
2402  if (dmstate->result)
2403  PQclear(dmstate->result);
2404 
2405  /* Release remote connection */
2406  ReleaseConnection(dmstate->conn);
2407  dmstate->conn = NULL;
2408 
2409  /* MemoryContext will be deleted automatically. */
2410 }
2411 
2412 /*
2413  * postgresExplainForeignScan
2414  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2415  */
2416 static void
2418 {
2419  List *fdw_private;
2420  char *sql;
2421  char *relations;
2422 
2423  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2424 
2425  /*
2426  * Add names of relation handled by the foreign scan when the scan is a
2427  * join
2428  */
2429  if (list_length(fdw_private) > FdwScanPrivateRelations)
2430  {
2431  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2432  ExplainPropertyText("Relations", relations, es);
2433  }
2434 
2435  /*
2436  * Add remote query, when VERBOSE option is specified.
2437  */
2438  if (es->verbose)
2439  {
2440  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2441  ExplainPropertyText("Remote SQL", sql, es);
2442  }
2443 }
2444 
2445 /*
2446  * postgresExplainForeignModify
2447  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2448  */
2449 static void
2451  ResultRelInfo *rinfo,
2452  List *fdw_private,
2453  int subplan_index,
2454  ExplainState *es)
2455 {
2456  if (es->verbose)
2457  {
2458  char *sql = strVal(list_nth(fdw_private,
2460 
2461  ExplainPropertyText("Remote SQL", sql, es);
2462  }
2463 }
2464 
2465 /*
2466  * postgresExplainDirectModify
2467  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2468  * foreign table directly
2469  */
2470 static void
2472 {
2473  List *fdw_private;
2474  char *sql;
2475 
2476  if (es->verbose)
2477  {
2478  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2479  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2480  ExplainPropertyText("Remote SQL", sql, es);
2481  }
2482 }
2483 
2484 
2485 /*
2486  * estimate_path_cost_size
2487  * Get cost and size estimates for a foreign scan on given foreign relation
2488  * either a base relation or a join between foreign relations or an upper
2489  * relation containing foreign relations.
2490  *
2491  * param_join_conds are the parameterization clauses with outer relations.
2492  * pathkeys specify the expected sort order if any for given path being costed.
2493  *
2494  * The function returns the cost and size estimates in p_row, p_width,
2495  * p_startup_cost and p_total_cost variables.
2496  */
2497 static void
2499  RelOptInfo *foreignrel,
2500  List *param_join_conds,
2501  List *pathkeys,
2502  double *p_rows, int *p_width,
2503  Cost *p_startup_cost, Cost *p_total_cost)
2504 {
2505  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2506  double rows;
2507  double retrieved_rows;
2508  int width;
2509  Cost startup_cost;
2510  Cost total_cost;
2511  Cost cpu_per_tuple;
2512 
2513  /*
2514  * If the table or the server is configured to use remote estimates,
2515  * connect to the foreign server and execute EXPLAIN to estimate the
2516  * number of rows selected by the restriction+join clauses. Otherwise,
2517  * estimate rows using whatever statistics we have locally, in a way
2518  * similar to ordinary tables.
2519  */
2520  if (fpinfo->use_remote_estimate)
2521  {
2522  List *remote_param_join_conds;
2523  List *local_param_join_conds;
2524  StringInfoData sql;
2525  PGconn *conn;
2526  Selectivity local_sel;
2527  QualCost local_cost;
2528  List *fdw_scan_tlist = NIL;
2529  List *remote_conds;
2530 
2531  /* Required only to be passed to deparseSelectStmtForRel */
2533 
2534  /*
2535  * param_join_conds might contain both clauses that are safe to send
2536  * across, and clauses that aren't.
2537  */
2538  classifyConditions(root, foreignrel, param_join_conds,
2539  &remote_param_join_conds, &local_param_join_conds);
2540 
2541  /* Build the list of columns to be fetched from the foreign server. */
2542  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
2543  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2544  else
2545  fdw_scan_tlist = NIL;
2546 
2547  /*
2548  * The complete list of remote conditions includes everything from
2549  * baserestrictinfo plus any extra join_conds relevant to this
2550  * particular path.
2551  */
2552  remote_conds = list_concat(list_copy(remote_param_join_conds),
2553  fpinfo->remote_conds);
2554 
2555  /*
2556  * Construct EXPLAIN query including the desired SELECT, FROM, and
2557  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2558  * values, so don't request params_list.
2559  */
2560  initStringInfo(&sql);
2561  appendStringInfoString(&sql, "EXPLAIN ");
2562  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2563  remote_conds, pathkeys, false,
2564  &retrieved_attrs, NULL);
2565 
2566  /* Get the remote estimate */
2567  conn = GetConnection(fpinfo->user, false);
2568  get_remote_estimate(sql.data, conn, &rows, &width,
2569  &startup_cost, &total_cost);
2570  ReleaseConnection(conn);
2571 
2572  retrieved_rows = rows;
2573 
2574  /* Factor in the selectivity of the locally-checked quals */
2575  local_sel = clauselist_selectivity(root,
2576  local_param_join_conds,
2577  foreignrel->relid,
2578  JOIN_INNER,
2579  NULL);
2580  local_sel *= fpinfo->local_conds_sel;
2581 
2582  rows = clamp_row_est(rows * local_sel);
2583 
2584  /* Add in the eval cost of the locally-checked quals */
2585  startup_cost += fpinfo->local_conds_cost.startup;
2586  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2587  cost_qual_eval(&local_cost, local_param_join_conds, root);
2588  startup_cost += local_cost.startup;
2589  total_cost += local_cost.per_tuple * retrieved_rows;
2590  }
2591  else
2592  {
2593  Cost run_cost = 0;
2594 
2595  /*
2596  * We don't support join conditions in this mode (hence, no
2597  * parameterized paths can be made).
2598  */
2599  Assert(param_join_conds == NIL);
2600 
2601  /*
2602  * Use rows/width estimates made by set_baserel_size_estimates() for
2603  * base foreign relations and set_joinrel_size_estimates() for join
2604  * between foreign relations.
2605  */
2606  rows = foreignrel->rows;
2607  width = foreignrel->reltarget->width;
2608 
2609  /* Back into an estimate of the number of retrieved rows. */
2610  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2611 
2612  /*
2613  * We will come here again and again with different set of pathkeys
2614  * that caller wants to cost. We don't need to calculate the cost of
2615  * bare scan each time. Instead, use the costs if we have cached them
2616  * already.
2617  */
2618  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2619  {
2620  startup_cost = fpinfo->rel_startup_cost;
2621  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2622  }
2623  else if (IS_JOIN_REL(foreignrel))
2624  {
2625  PgFdwRelationInfo *fpinfo_i;
2626  PgFdwRelationInfo *fpinfo_o;
2627  QualCost join_cost;
2628  QualCost remote_conds_cost;
2629  double nrows;
2630 
2631  /* For join we expect inner and outer relations set */
2632  Assert(fpinfo->innerrel && fpinfo->outerrel);
2633 
2634  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2635  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2636 
2637  /* Estimate of number of rows in cross product */
2638  nrows = fpinfo_i->rows * fpinfo_o->rows;
2639  /* Clamp retrieved rows estimate to at most size of cross product */
2640  retrieved_rows = Min(retrieved_rows, nrows);
2641 
2642  /*
2643  * The cost of foreign join is estimated as cost of generating
2644  * rows for the joining relations + cost for applying quals on the
2645  * rows.
2646  */
2647 
2648  /*
2649  * Calculate the cost of clauses pushed down to the foreign server
2650  */
2651  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2652  /* Calculate the cost of applying join clauses */
2653  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2654 
2655  /*
2656  * Startup cost includes startup cost of joining relations and the
2657  * startup cost for join and other clauses. We do not include the
2658  * startup cost specific to join strategy (e.g. setting up hash
2659  * tables) since we do not know what strategy the foreign server
2660  * is going to use.
2661  */
2662  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2663  startup_cost += join_cost.startup;
2664  startup_cost += remote_conds_cost.startup;
2665  startup_cost += fpinfo->local_conds_cost.startup;
2666 
2667  /*
2668  * Run time cost includes:
2669  *
2670  * 1. Run time cost (total_cost - startup_cost) of relations being
2671  * joined
2672  *
2673  * 2. Run time cost of applying join clauses on the cross product
2674  * of the joining relations.
2675  *
2676  * 3. Run time cost of applying pushed down other clauses on the
2677  * result of join
2678  *
2679  * 4. Run time cost of applying nonpushable other clauses locally
2680  * on the result fetched from the foreign server.
2681  */
2682  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2683  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2684  run_cost += nrows * join_cost.per_tuple;
2685  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2686  run_cost += nrows * remote_conds_cost.per_tuple;
2687  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2688  }
2689  else if (IS_UPPER_REL(foreignrel))
2690  {
2691  PgFdwRelationInfo *ofpinfo;
2692  PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2693  AggClauseCosts aggcosts;
2694  double input_rows;
2695  int numGroupCols;
2696  double numGroups = 1;
2697 
2698  /*
2699  * This cost model is mixture of costing done for sorted and
2700  * hashed aggregates in cost_agg(). We are not sure which
2701  * strategy will be considered at remote side, thus for
2702  * simplicity, we put all startup related costs in startup_cost
2703  * and all finalization and run cost are added in total_cost.
2704  *
2705  * Also, core does not care about costing HAVING expressions and
2706  * adding that to the costs. So similarly, here too we are not
2707  * considering remote and local conditions for costing.
2708  */
2709 
2710  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2711 
2712  /* Get rows and width from input rel */
2713  input_rows = ofpinfo->rows;
2714  width = ofpinfo->width;
2715 
2716  /* Collect statistics about aggregates for estimating costs. */
2717  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2718  if (root->parse->hasAggs)
2719  {
2720  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2721  AGGSPLIT_SIMPLE, &aggcosts);
2722  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2723  AGGSPLIT_SIMPLE, &aggcosts);
2724  }
2725 
2726  /* Get number of grouping columns and possible number of groups */
2727  numGroupCols = list_length(root->parse->groupClause);
2728  numGroups = estimate_num_groups(root,
2730  fpinfo->grouped_tlist),
2731  input_rows, NULL);
2732 
2733  /*
2734  * Number of rows expected from foreign server will be same as
2735  * that of number of groups.
2736  */
2737  rows = retrieved_rows = numGroups;
2738 
2739  /*-----
2740  * Startup cost includes:
2741  * 1. Startup cost for underneath input * relation
2742  * 2. Cost of performing aggregation, per cost_agg()
2743  * 3. Startup cost for PathTarget eval
2744  *-----
2745  */
2746  startup_cost = ofpinfo->rel_startup_cost;
2747  startup_cost += aggcosts.transCost.startup;
2748  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2749  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2750  startup_cost += ptarget->cost.startup;
2751 
2752  /*-----
2753  * Run time cost includes:
2754  * 1. Run time cost of underneath input relation
2755  * 2. Run time cost of performing aggregation, per cost_agg()
2756  * 3. PathTarget eval cost for each output row
2757  *-----
2758  */
2759  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2760  run_cost += aggcosts.finalCost * numGroups;
2761  run_cost += cpu_tuple_cost * numGroups;
2762  run_cost += ptarget->cost.per_tuple * numGroups;
2763  }
2764  else
2765  {
2766  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2767  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2768 
2769  /*
2770  * Cost as though this were a seqscan, which is pessimistic. We
2771  * effectively imagine the local_conds are being evaluated
2772  * remotely, too.
2773  */
2774  startup_cost = 0;
2775  run_cost = 0;
2776  run_cost += seq_page_cost * foreignrel->pages;
2777 
2778  startup_cost += foreignrel->baserestrictcost.startup;
2779  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2780  run_cost += cpu_per_tuple * foreignrel->tuples;
2781  }
2782 
2783  /*
2784  * Without remote estimates, we have no real way to estimate the cost
2785  * of generating sorted output. It could be free if the query plan
2786  * the remote side would have chosen generates properly-sorted output
2787  * anyway, but in most cases it will cost something. Estimate a value
2788  * high enough that we won't pick the sorted path when the ordering
2789  * isn't locally useful, but low enough that we'll err on the side of
2790  * pushing down the ORDER BY clause when it's useful to do so.
2791  */
2792  if (pathkeys != NIL)
2793  {
2794  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2795  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2796  }
2797 
2798  total_cost = startup_cost + run_cost;
2799  }
2800 
2801  /*
2802  * Cache the costs for scans without any pathkeys or parameterization
2803  * before adding the costs for transferring data from the foreign server.
2804  * These costs are useful for costing the join between this relation and
2805  * another foreign relation or to calculate the costs of paths with
2806  * pathkeys for this relation, when the costs can not be obtained from the
2807  * foreign server. This function will be called at least once for every
2808  * foreign relation without pathkeys and parameterization.
2809  */
2810  if (pathkeys == NIL && param_join_conds == NIL)
2811  {
2812  fpinfo->rel_startup_cost = startup_cost;
2813  fpinfo->rel_total_cost = total_cost;
2814  }
2815 
2816  /*
2817  * Add some additional cost factors to account for connection overhead
2818  * (fdw_startup_cost), transferring data across the network
2819  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2820  * (cpu_tuple_cost per retrieved row).
2821  */
2822  startup_cost += fpinfo->fdw_startup_cost;
2823  total_cost += fpinfo->fdw_startup_cost;
2824  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2825  total_cost += cpu_tuple_cost * retrieved_rows;
2826 
2827  /* Return results. */
2828  *p_rows = rows;
2829  *p_width = width;
2830  *p_startup_cost = startup_cost;
2831  *p_total_cost = total_cost;
2832 }
2833 
2834 /*
2835  * Estimate costs of executing a SQL statement remotely.
2836  * The given "sql" must be an EXPLAIN command.
2837  */
2838 static void
2839 get_remote_estimate(const char *sql, PGconn *conn,
2840  double *rows, int *width,
2841  Cost *startup_cost, Cost *total_cost)
2842 {
2843  PGresult *volatile res = NULL;
2844 
2845  /* PGresult must be released before leaving this function. */
2846  PG_TRY();
2847  {
2848  char *line;
2849  char *p;
2850  int n;
2851 
2852  /*
2853  * Execute EXPLAIN remotely.
2854  */
2855  res = pgfdw_exec_query(conn, sql);
2856  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2857  pgfdw_report_error(ERROR, res, conn, false, sql);
2858 
2859  /*
2860  * Extract cost numbers for topmost plan node. Note we search for a
2861  * left paren from the end of the line to avoid being confused by
2862  * other uses of parentheses.
2863  */
2864  line = PQgetvalue(res, 0, 0);
2865  p = strrchr(line, '(');
2866  if (p == NULL)
2867  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2868  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2869  startup_cost, total_cost, rows, width);
2870  if (n != 4)
2871  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2872 
2873  PQclear(res);
2874  res = NULL;
2875  }
2876  PG_CATCH();
2877  {
2878  if (res)
2879  PQclear(res);
2880  PG_RE_THROW();
2881  }
2882  PG_END_TRY();
2883 }
2884 
2885 /*
2886  * Detect whether we want to process an EquivalenceClass member.
2887  *
2888  * This is a callback for use by generate_implied_equalities_for_column.
2889  */
2890 static bool
2893  void *arg)
2894 {
2896  Expr *expr = em->em_expr;
2897 
2898  /*
2899  * If we've identified what we're processing in the current scan, we only
2900  * want to match that expression.
2901  */
2902  if (state->current != NULL)
2903  return equal(expr, state->current);
2904 
2905  /*
2906  * Otherwise, ignore anything we've already processed.
2907  */
2908  if (list_member(state->already_used, expr))
2909  return false;
2910 
2911  /* This is the new target to process. */
2912  state->current = expr;
2913  return true;
2914 }
2915 
2916 /*
2917  * Create cursor for node's query with current parameter values.
2918  */
2919 static void
2921 {
2922  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2923  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2924  int numParams = fsstate->numParams;
2925  const char **values = fsstate->param_values;
2926  PGconn *conn = fsstate->conn;
2928  PGresult *res;
2929 
2930  /*
2931  * Construct array of query parameter values in text format. We do the
2932  * conversions in the short-lived per-tuple context, so as not to cause a
2933  * memory leak over repeated scans.
2934  */
2935  if (numParams > 0)
2936  {
2937  MemoryContext oldcontext;
2938 
2939  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2940 
2941  process_query_params(econtext,
2942  fsstate->param_flinfo,
2943  fsstate->param_exprs,
2944  values);
2945 
2946  MemoryContextSwitchTo(oldcontext);
2947  }
2948 
2949  /* Construct the DECLARE CURSOR command */
2950  initStringInfo(&buf);
2951  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2952  fsstate->cursor_number, fsstate->query);
2953 
2954  /*
2955  * Notice that we pass NULL for paramTypes, thus forcing the remote server
2956  * to infer types for all parameters. Since we explicitly cast every
2957  * parameter (see deparse.c), the "inference" is trivial and will produce
2958  * the desired result. This allows us to avoid assuming that the remote
2959  * server has the same OIDs we do for the parameters' types.
2960  */
2961  if (!PQsendQueryParams(conn, buf.data, numParams,
2962  NULL, values, NULL, NULL, 0))
2963  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2964 
2965  /*
2966  * Get the result, and check for success.
2967  *
2968  * We don't use a PG_TRY block here, so be careful not to throw error
2969  * without releasing the PGresult.
2970  */
2971  res = pgfdw_get_result(conn, buf.data);
2972  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2973  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2974  PQclear(res);
2975 
2976  /* Mark the cursor as created, and show no tuples have been retrieved */
2977  fsstate->cursor_exists = true;
2978  fsstate->tuples = NULL;
2979  fsstate->num_tuples = 0;
2980  fsstate->next_tuple = 0;
2981  fsstate->fetch_ct_2 = 0;
2982  fsstate->eof_reached = false;
2983 
2984  /* Clean up */
2985  pfree(buf.data);
2986 }
2987 
2988 /*
2989  * Fetch some more rows from the node's cursor.
2990  */
2991 static void
2993 {
2994  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2995  PGresult *volatile res = NULL;
2996  MemoryContext oldcontext;
2997 
2998  /*
2999  * We'll store the tuples in the batch_cxt. First, flush the previous
3000  * batch.
3001  */
3002  fsstate->tuples = NULL;
3003  MemoryContextReset(fsstate->batch_cxt);
3004  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3005 
3006  /* PGresult must be released before leaving this function. */
3007  PG_TRY();
3008  {
3009  PGconn *conn = fsstate->conn;
3010  char sql[64];
3011  int numrows;
3012  int i;
3013 
3014  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3015  fsstate->fetch_size, fsstate->cursor_number);
3016 
3017  res = pgfdw_exec_query(conn, sql);
3018  /* On error, report the original query, not the FETCH. */
3019  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3020  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3021 
3022  /* Convert the data into HeapTuples */
3023  numrows = PQntuples(res);
3024  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3025  fsstate->num_tuples = numrows;
3026  fsstate->next_tuple = 0;
3027 
3028  for (i = 0; i < numrows; i++)
3029  {
3030  Assert(IsA(node->ss.ps.plan, ForeignScan));
3031 
3032  fsstate->tuples[i] =
3034  fsstate->rel,
3035  fsstate->attinmeta,
3036  fsstate->retrieved_attrs,
3037  node,
3038  fsstate->temp_cxt);
3039  }
3040 
3041  /* Update fetch_ct_2 */
3042  if (fsstate->fetch_ct_2 < 2)
3043  fsstate->fetch_ct_2++;
3044 
3045  /* Must be EOF if we didn't get as many tuples as we asked for. */
3046  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3047 
3048  PQclear(res);
3049  res = NULL;
3050  }
3051  PG_CATCH();
3052  {
3053  if (res)
3054  PQclear(res);
3055  PG_RE_THROW();
3056  }
3057  PG_END_TRY();
3058 
3059  MemoryContextSwitchTo(oldcontext);
3060 }
3061 
3062 /*
3063  * Force assorted GUC parameters to settings that ensure that we'll output
3064  * data values in a form that is unambiguous to the remote server.
3065  *
3066  * This is rather expensive and annoying to do once per row, but there's
3067  * little choice if we want to be sure values are transmitted accurately;
3068  * we can't leave the settings in place between rows for fear of affecting
3069  * user-visible computations.
3070  *
3071  * We use the equivalent of a function SET option to allow the settings to
3072  * persist only until the caller calls reset_transmission_modes(). If an
3073  * error is thrown in between, guc.c will take care of undoing the settings.
3074  *
3075  * The return value is the nestlevel that must be passed to
3076  * reset_transmission_modes() to undo things.
3077  */
3078 int
3080 {
3081  int nestlevel = NewGUCNestLevel();
3082 
3083  /*
3084  * The values set here should match what pg_dump does. See also
3085  * configure_remote_session in connection.c.
3086  */
3087  if (DateStyle != USE_ISO_DATES)
3088  (void) set_config_option("datestyle", "ISO",
3090  GUC_ACTION_SAVE, true, 0, false);
3092  (void) set_config_option("intervalstyle", "postgres",
3094  GUC_ACTION_SAVE, true, 0, false);
3095  if (extra_float_digits < 3)
3096  (void) set_config_option("extra_float_digits", "3",
3098  GUC_ACTION_SAVE, true, 0, false);
3099 
3100  return nestlevel;
3101 }
3102 
3103 /*
3104  * Undo the effects of set_transmission_modes().
3105  */
3106 void
3108 {
3109  AtEOXact_GUC(true, nestlevel);
3110 }
3111 
3112 /*
3113  * Utility routine to close a cursor.
3114  */
3115 static void
3117 {
3118  char sql[64];
3119  PGresult *res;
3120 
3121  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3122 
3123  /*
3124  * We don't use a PG_TRY block here, so be careful not to throw error
3125  * without releasing the PGresult.
3126  */
3127  res = pgfdw_exec_query(conn, sql);
3128  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3129  pgfdw_report_error(ERROR, res, conn, true, sql);
3130  PQclear(res);
3131 }
3132 
3133 /*
3134  * prepare_foreign_modify
3135  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3136  */
3137 static void
3139 {
3140  char prep_name[NAMEDATALEN];
3141  char *p_name;
3142  PGresult *res;
3143 
3144  /* Construct name we'll use for the prepared statement. */
3145  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3146  GetPrepStmtNumber(fmstate->conn));
3147  p_name = pstrdup(prep_name);
3148 
3149  /*
3150  * We intentionally do not specify parameter types here, but leave the
3151  * remote server to derive them by default. This avoids possible problems
3152  * with the remote server using different type OIDs than we do. All of
3153  * the prepared statements we use in this module are simple enough that
3154  * the remote server will make the right choices.
3155  */
3156  if (!PQsendPrepare(fmstate->conn,
3157  p_name,
3158  fmstate->query,
3159  0,
3160  NULL))
3161  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3162 
3163  /*
3164  * Get the result, and check for success.
3165  *
3166  * We don't use a PG_TRY block here, so be careful not to throw error
3167  * without releasing the PGresult.
3168  */
3169  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3170  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3171  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3172  PQclear(res);
3173 
3174  /* This action shows that the prepare has been done. */
3175  fmstate->p_name = p_name;
3176 }
3177 
3178 /*
3179  * convert_prep_stmt_params
3180  * Create array of text strings representing parameter values
3181  *
3182  * tupleid is ctid to send, or NULL if none
3183  * slot is slot to get remaining parameters from, or NULL if none
3184  *
3185  * Data is constructed in temp_cxt; caller should reset that after use.
3186  */
3187 static const char **
3189  ItemPointer tupleid,
3190  TupleTableSlot *slot)
3191 {
3192  const char **p_values;
3193  int pindex = 0;
3194  MemoryContext oldcontext;
3195 
3196  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3197 
3198  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3199 
3200  /* 1st parameter should be ctid, if it's in use */
3201  if (tupleid != NULL)
3202  {
3203  /* don't need set_transmission_modes for TID output */
3204  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3205  PointerGetDatum(tupleid));
3206  pindex++;
3207  }
3208 
3209  /* get following parameters from slot */
3210  if (slot != NULL && fmstate->target_attrs != NIL)
3211  {
3212  int nestlevel;
3213  ListCell *lc;
3214 
3215  nestlevel = set_transmission_modes();
3216 
3217  foreach(lc, fmstate->target_attrs)
3218  {
3219  int attnum = lfirst_int(lc);
3220  Datum value;
3221  bool isnull;
3222 
3223  value = slot_getattr(slot, attnum, &isnull);
3224  if (isnull)
3225  p_values[pindex] = NULL;
3226  else
3227  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3228  value);
3229  pindex++;
3230  }
3231 
3232  reset_transmission_modes(nestlevel);
3233  }
3234 
3235  Assert(pindex == fmstate->p_nums);
3236 
3237  MemoryContextSwitchTo(oldcontext);
3238 
3239  return p_values;
3240 }
3241 
3242 /*
3243  * store_returning_result
3244  * Store the result of a RETURNING clause
3245  *
3246  * On error, be sure to release the PGresult on the way out. Callers do not
3247  * have PG_TRY blocks to ensure this happens.
3248  */
3249 static void
3251  TupleTableSlot *slot, PGresult *res)
3252 {
3253  PG_TRY();
3254  {
3255  HeapTuple newtup;
3256 
3257  newtup = make_tuple_from_result_row(res, 0,
3258  fmstate->rel,
3259  fmstate->attinmeta,
3260  fmstate->retrieved_attrs,
3261  NULL,
3262  fmstate->temp_cxt);
3263  /* tuple will be deleted when it is cleared from the slot */
3264  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3265  }
3266  PG_CATCH();
3267  {
3268  if (res)
3269  PQclear(res);
3270  PG_RE_THROW();
3271  }
3272  PG_END_TRY();
3273 }
3274 
3275 /*
3276  * Execute a direct UPDATE/DELETE statement.
3277  */
3278 static void
3280 {
3282  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3283  int numParams = dmstate->numParams;
3284  const char **values = dmstate->param_values;
3285 
3286  /*
3287  * Construct array of query parameter values in text format.
3288  */
3289  if (numParams > 0)
3290  process_query_params(econtext,
3291  dmstate->param_flinfo,
3292  dmstate->param_exprs,
3293  values);
3294 
3295  /*
3296  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3297  * to infer types for all parameters. Since we explicitly cast every
3298  * parameter (see deparse.c), the "inference" is trivial and will produce
3299  * the desired result. This allows us to avoid assuming that the remote
3300  * server has the same OIDs we do for the parameters' types.
3301  */
3302  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3303  NULL, values, NULL, NULL, 0))
3304  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3305 
3306  /*
3307  * Get the result, and check for success.
3308  *
3309  * We don't use a PG_TRY block here, so be careful not to throw error
3310  * without releasing the PGresult.
3311  */
3312  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3313  if (PQresultStatus(dmstate->result) !=
3315  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3316  dmstate->query);
3317 
3318  /* Get the number of rows affected. */
3319  if (dmstate->has_returning)
3320  dmstate->num_tuples = PQntuples(dmstate->result);
3321  else
3322  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3323 }
3324 
3325 /*
3326  * Get the result of a RETURNING clause.
3327  */
3328 static TupleTableSlot *
3330 {
3332  EState *estate = node->ss.ps.state;
3333  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3334  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3335 
3336  Assert(resultRelInfo->ri_projectReturning);
3337 
3338  /* If we didn't get any tuples, must be end of data. */
3339  if (dmstate->next_tuple >= dmstate->num_tuples)
3340  return ExecClearTuple(slot);
3341 
3342  /* Increment the command es_processed count if necessary. */
3343  if (dmstate->set_processed)
3344  estate->es_processed += 1;
3345 
3346  /*
3347  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3348  * tuple. (has_returning is false when the local query is of the form
3349  * "UPDATE/DELETE .. RETURNING 1" for example.)
3350  */
3351  if (!dmstate->has_returning)
3352  ExecStoreAllNullTuple(slot);
3353  else
3354  {
3355  /*
3356  * On error, be sure to release the PGresult on the way out. Callers
3357  * do not have PG_TRY blocks to ensure this happens.
3358  */
3359  PG_TRY();
3360  {
3361  HeapTuple newtup;
3362 
3363  newtup = make_tuple_from_result_row(dmstate->result,
3364  dmstate->next_tuple,
3365  dmstate->rel,
3366  dmstate->attinmeta,
3367  dmstate->retrieved_attrs,
3368  NULL,
3369  dmstate->temp_cxt);
3370  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3371  }
3372  PG_CATCH();
3373  {
3374  if (dmstate->result)
3375  PQclear(dmstate->result);
3376  PG_RE_THROW();
3377  }
3378  PG_END_TRY();
3379  }
3380  dmstate->next_tuple++;
3381 
3382  /* Make slot available for evaluation of the local query RETURNING list. */
3383  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3384 
3385  return slot;
3386 }
3387 
3388 /*
3389  * Prepare for processing of parameters used in remote query.
3390  */
3391 static void
3393  List *fdw_exprs,
3394  int numParams,
3396  List **param_exprs,
3397  const char ***param_values)
3398 {
3399  int i;
3400  ListCell *lc;
3401 
3402  Assert(numParams > 0);
3403 
3404  /* Prepare for output conversion of parameters used in remote query. */
3405  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3406 
3407  i = 0;
3408  foreach(lc, fdw_exprs)
3409  {
3410  Node *param_expr = (Node *) lfirst(lc);
3411  Oid typefnoid;
3412  bool isvarlena;
3413 
3414  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3415  fmgr_info(typefnoid, &(*param_flinfo)[i]);
3416  i++;
3417  }
3418 
3419  /*
3420  * Prepare remote-parameter expressions for evaluation. (Note: in
3421  * practice, we expect that all these expressions will be just Params, so
3422  * we could possibly do something more efficient than using the full
3423  * expression-eval machinery for this. But probably there would be little
3424  * benefit, and it'd require postgres_fdw to know more than is desirable
3425  * about Param evaluation.)
3426  */
3427  *param_exprs = ExecInitExprList(fdw_exprs, node);
3428 
3429  /* Allocate buffer for text form of query parameters. */
3430  *param_values = (const char **) palloc0(numParams * sizeof(char *));
3431 }
3432 
3433 /*
3434  * Construct array of query parameter values in text format.
3435  */
3436 static void
3439  List *param_exprs,
3440  const char **param_values)
3441 {
3442  int nestlevel;
3443  int i;
3444  ListCell *lc;
3445 
3446  nestlevel = set_transmission_modes();
3447 
3448  i = 0;
3449  foreach(lc, param_exprs)
3450  {
3451  ExprState *expr_state = (ExprState *) lfirst(lc);
3452  Datum expr_value;
3453  bool isNull;
3454 
3455  /* Evaluate the parameter expression */
3456  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
3457 
3458  /*
3459  * Get string representation of each parameter value by invoking
3460  * type-specific output function, unless the value is null.
3461  */
3462  if (isNull)
3463  param_values[i] = NULL;
3464  else
3465  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3466 
3467  i++;
3468  }
3469 
3470  reset_transmission_modes(nestlevel);
3471 }
3472 
3473 /*
3474  * postgresAnalyzeForeignTable
3475  * Test whether analyzing this foreign table is supported
3476  */
3477 static bool
3479  AcquireSampleRowsFunc *func,
3480  BlockNumber *totalpages)
3481 {
3482  ForeignTable *table;
3483  UserMapping *user;
3484  PGconn *conn;
3485  StringInfoData sql;
3486  PGresult *volatile res = NULL;
3487 
3488  /* Return the row-analysis function pointer */
3490 
3491  /*
3492  * Now we have to get the number of pages. It's annoying that the ANALYZE
3493  * API requires us to return that now, because it forces some duplication
3494  * of effort between this routine and postgresAcquireSampleRowsFunc. But
3495  * it's probably not worth redefining that API at this point.
3496  */
3497 
3498  /*
3499  * Get the connection to use. We do the remote access as the table's
3500  * owner, even if the ANALYZE was started by some other user.
3501  */
3502  table = GetForeignTable(RelationGetRelid(relation));
3503  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3504  conn = GetConnection(user, false);
3505 
3506  /*
3507  * Construct command to get page count for relation.
3508  */
3509  initStringInfo(&sql);
3510  deparseAnalyzeSizeSql(&sql, relation);
3511 
3512  /* In what follows, do not risk leaking any PGresults. */
3513  PG_TRY();
3514  {
3515  res = pgfdw_exec_query(conn, sql.data);
3516  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3517  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3518 
3519  if (PQntuples(res) != 1 || PQnfields(res) != 1)
3520  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3521  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3522 
3523  PQclear(res);
3524  res = NULL;
3525  }
3526  PG_CATCH();
3527  {
3528  if (res)
3529  PQclear(res);
3530  PG_RE_THROW();
3531  }
3532  PG_END_TRY();
3533 
3534  ReleaseConnection(conn);
3535 
3536  return true;
3537 }
3538 
3539 /*
3540  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3541  *
3542  * We fetch the whole table from the remote side and pick out some sample rows.
3543  *
3544  * Selected rows are returned in the caller-allocated array rows[],
3545  * which must have at least targrows entries.
3546  * The actual number of rows selected is returned as the function result.
3547  * We also count the total number of rows in the table and return it into
3548  * *totalrows. Note that *totaldeadrows is always set to 0.
3549  *
3550  * Note that the returned list of rows is not always in order by physical
3551  * position in the table. Therefore, correlation estimates derived later
3552  * may be meaningless, but it's OK because we don't use the estimates
3553  * currently (the planner only pays attention to correlation for indexscans).
3554  */
3555 static int
3557  HeapTuple *rows, int targrows,
3558  double *totalrows,
3559  double *totaldeadrows)
3560 {
3561  PgFdwAnalyzeState astate;
3562  ForeignTable *table;
3563  ForeignServer *server;
3564  UserMapping *user;
3565  PGconn *conn;
3566  unsigned int cursor_number;
3567  StringInfoData sql;
3568  PGresult *volatile res = NULL;
3569 
3570  /* Initialize workspace state */
3571  astate.rel = relation;
3573 
3574  astate.rows = rows;
3575  astate.targrows = targrows;
3576  astate.numrows = 0;
3577  astate.samplerows = 0;
3578  astate.rowstoskip = -1; /* -1 means not set yet */
3579  reservoir_init_selection_state(&astate.rstate, targrows);
3580 
3581  /* Remember ANALYZE context, and create a per-tuple temp context */
3582  astate.anl_cxt = CurrentMemoryContext;
3584  "postgres_fdw temporary data",
3586 
3587  /*
3588  * Get the connection to use. We do the remote access as the table's
3589  * owner, even if the ANALYZE was started by some other user.
3590  */
3591  table = GetForeignTable(RelationGetRelid(relation));
3592  server = GetForeignServer(table->serverid);
3593  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3594  conn = GetConnection(user, false);
3595 
3596  /*
3597  * Construct cursor that retrieves whole rows from remote.
3598  */
3599  cursor_number = GetCursorNumber(conn);
3600  initStringInfo(&sql);
3601  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3602  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3603 
3604  /* In what follows, do not risk leaking any PGresults. */
3605  PG_TRY();
3606  {
3607  res = pgfdw_exec_query(conn, sql.data);
3608  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3609  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3610  PQclear(res);
3611  res = NULL;
3612 
3613  /* Retrieve and process rows a batch at a time. */
3614  for (;;)
3615  {
3616  char fetch_sql[64];
3617  int fetch_size;
3618  int numrows;
3619  int i;
3620  ListCell *lc;
3621 
3622  /* Allow users to cancel long query */
3624 
3625  /*
3626  * XXX possible future improvement: if rowstoskip is large, we
3627  * could issue a MOVE rather than physically fetching the rows,
3628  * then just adjust rowstoskip and samplerows appropriately.
3629  */
3630 
3631  /* The fetch size is arbitrary, but shouldn't be enormous. */
3632  fetch_size = 100;
3633  foreach(lc, server->options)
3634  {
3635  DefElem *def = (DefElem *) lfirst(lc);
3636 
3637  if (strcmp(def->defname, "fetch_size") == 0)
3638  {
3639  fetch_size = strtol(defGetString(def), NULL, 10);
3640  break;
3641  }
3642  }
3643  foreach(lc, table->options)
3644  {
3645  DefElem *def = (DefElem *) lfirst(lc);
3646 
3647  if (strcmp(def->defname, "fetch_size") == 0)
3648  {
3649  fetch_size = strtol(defGetString(def), NULL, 10);
3650  break;
3651  }
3652  }
3653 
3654  /* Fetch some rows */
3655  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3656  fetch_size, cursor_number);
3657 
3658  res = pgfdw_exec_query(conn, fetch_sql);
3659  /* On error, report the original query, not the FETCH. */
3660  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3661  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3662 
3663  /* Process whatever we got. */
3664  numrows = PQntuples(res);
3665  for (i = 0; i < numrows; i++)
3666  analyze_row_processor(res, i, &astate);
3667 
3668  PQclear(res);
3669  res = NULL;
3670 
3671  /* Must be EOF if we didn't get all the rows requested. */
3672  if (numrows < fetch_size)
3673  break;
3674  }
3675 
3676  /* Close the cursor, just to be tidy. */
3677  close_cursor(conn, cursor_number);
3678  }
3679  PG_CATCH();
3680  {
3681  if (res)
3682  PQclear(res);
3683  PG_RE_THROW();
3684  }
3685  PG_END_TRY();
3686 
3687  ReleaseConnection(conn);
3688 
3689  /* We assume that we have no dead tuple. */
3690  *totaldeadrows = 0.0;
3691 
3692  /* We've retrieved all living tuples from foreign server. */
3693  *totalrows = astate.samplerows;
3694 
3695  /*
3696  * Emit some interesting relation info
3697  */
3698  ereport(elevel,
3699  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3700  RelationGetRelationName(relation),
3701  astate.samplerows, astate.numrows)));
3702 
3703  return astate.numrows;
3704 }
3705 
3706 /*
3707  * Collect sample rows from the result of query.
3708  * - Use all tuples in sample until target # of samples are collected.
3709  * - Subsequently, replace already-sampled tuples randomly.
3710  */
3711 static void
3713 {
3714  int targrows = astate->targrows;
3715  int pos; /* array index to store tuple in */
3716  MemoryContext oldcontext;
3717 
3718  /* Always increment sample row counter. */
3719  astate->samplerows += 1;
3720 
3721  /*
3722  * Determine the slot where this sample row should be stored. Set pos to
3723  * negative value to indicate the row should be skipped.
3724  */
3725  if (astate->numrows < targrows)
3726  {
3727  /* First targrows rows are always included into the sample */
3728  pos = astate->numrows++;
3729  }
3730  else
3731  {
3732  /*
3733  * Now we start replacing tuples in the sample until we reach the end
3734  * of the relation. Same algorithm as in acquire_sample_rows in
3735  * analyze.c; see Jeff Vitter's paper.
3736  */
3737  if (astate->rowstoskip < 0)
3738  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3739 
3740  if (astate->rowstoskip <= 0)
3741  {
3742  /* Choose a random reservoir element to replace. */
3743  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3744  Assert(pos >= 0 && pos < targrows);
3745  heap_freetuple(astate->rows[pos]);
3746  }
3747  else
3748  {
3749  /* Skip this tuple. */
3750  pos = -1;
3751  }
3752 
3753  astate->rowstoskip -= 1;
3754  }
3755 
3756  if (pos >= 0)
3757  {
3758  /*
3759  * Create sample tuple from current result row, and store it in the
3760  * position determined above. The tuple has to be created in anl_cxt.
3761  */
3762  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3763 
3764  astate->rows[pos] = make_tuple_from_result_row(res, row,
3765  astate->rel,
3766  astate->attinmeta,
3767  astate->retrieved_attrs,
3768  NULL,
3769  astate->temp_cxt);
3770 
3771  MemoryContextSwitchTo(oldcontext);
3772  }
3773 }
3774 
3775 /*
3776  * Import a foreign schema
3777  */
3778 static List *
3780 {
3781  List *commands = NIL;
3782  bool import_collate = true;
3783  bool import_default = false;
3784  bool import_not_null = true;
3785  ForeignServer *server;
3786  UserMapping *mapping;
3787  PGconn *conn;
3789  PGresult *volatile res = NULL;
3790  int numrows,
3791  i;
3792  ListCell *lc;
3793 
3794  /* Parse statement options */
3795  foreach(lc, stmt->options)
3796  {
3797  DefElem *def = (DefElem *) lfirst(lc);
3798 
3799  if (strcmp(def->defname, "import_collate") == 0)
3800  import_collate = defGetBoolean(def);
3801  else if (strcmp(def->defname, "import_default") == 0)
3802  import_default = defGetBoolean(def);
3803  else if (strcmp(def->defname, "import_not_null") == 0)
3804  import_not_null = defGetBoolean(def);
3805  else
3806  ereport(ERROR,
3807  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3808  errmsg("invalid option \"%s\"", def->defname)));
3809  }
3810 
3811  /*
3812  * Get connection to the foreign server. Connection manager will
3813  * establish new connection if necessary.
3814  */
3815  server = GetForeignServer(serverOid);
3816  mapping = GetUserMapping(GetUserId(), server->serverid);
3817  conn = GetConnection(mapping, false);
3818 
3819  /* Don't attempt to import collation if remote server hasn't got it */
3820  if (PQserverVersion(conn) < 90100)
3821  import_collate = false;
3822 
3823  /* Create workspace for strings */
3824  initStringInfo(&buf);
3825 
3826  /* In what follows, do not risk leaking any PGresults. */
3827  PG_TRY();
3828  {
3829  /* Check that the schema really exists */
3830  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3831  deparseStringLiteral(&buf, stmt->remote_schema);
3832 
3833  res = pgfdw_exec_query(conn, buf.data);
3834  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3835  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3836 
3837  if (PQntuples(res) != 1)
3838  ereport(ERROR,
3839  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3840  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3841  stmt->remote_schema, server->servername)));
3842 
3843  PQclear(res);
3844  res = NULL;
3845  resetStringInfo(&buf);
3846 
3847  /*
3848  * Fetch all table data from this schema, possibly restricted by
3849  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3850  * to EXCEPT/LIMIT TO here, because the core code will filter the
3851  * statements we return according to those lists anyway. But it
3852  * should save a few cycles to not process excluded tables in the
3853  * first place.)
3854  *
3855  * Ignore table data for partitions and only include the definitions
3856  * of the root partitioned tables to allow access to the complete
3857  * remote data set locally in the schema imported.
3858  *
3859  * Note: because we run the connection with search_path restricted to
3860  * pg_catalog, the format_type() and pg_get_expr() outputs will always
3861  * include a schema name for types/functions in other schemas, which
3862  * is what we want.
3863  */
3864  if (import_collate)
3866  "SELECT relname, "
3867  " attname, "
3868  " format_type(atttypid, atttypmod), "
3869  " attnotnull, "
3870  " pg_get_expr(adbin, adrelid), "
3871  " collname, "
3872  " collnsp.nspname "
3873  "FROM pg_class c "
3874  " JOIN pg_namespace n ON "
3875  " relnamespace = n.oid "
3876  " LEFT JOIN pg_attribute a ON "
3877  " attrelid = c.oid AND attnum > 0 "
3878  " AND NOT attisdropped "
3879  " LEFT JOIN pg_attrdef ad ON "
3880  " adrelid = c.oid AND adnum = attnum "
3881  " LEFT JOIN pg_collation coll ON "
3882  " coll.oid = attcollation "
3883  " LEFT JOIN pg_namespace collnsp ON "
3884  " collnsp.oid = collnamespace ");
3885  else
3887  "SELECT relname, "
3888  " attname, "
3889  " format_type(atttypid, atttypmod), "
3890  " attnotnull, "
3891  " pg_get_expr(adbin, adrelid), "
3892  " NULL, NULL "
3893  "FROM pg_class c "
3894  " JOIN pg_namespace n ON "
3895  " relnamespace = n.oid "
3896  " LEFT JOIN pg_attribute a ON "
3897  " attrelid = c.oid AND attnum > 0 "
3898  " AND NOT attisdropped "
3899  " LEFT JOIN pg_attrdef ad ON "
3900  " adrelid = c.oid AND adnum = attnum ");
3901 
3903  "WHERE c.relkind IN ("
3909  " AND n.nspname = ");
3910  deparseStringLiteral(&buf, stmt->remote_schema);
3911 
3912  /* Partitions are supported since Postgres 10 */
3913  if (PQserverVersion(conn) >= 100000)
3914  appendStringInfoString(&buf, " AND NOT c.relispartition ");
3915 
3916  /* Apply restrictions for LIMIT TO and EXCEPT */
3917  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3919  {
3920  bool first_item = true;
3921 
3922  appendStringInfoString(&buf, " AND c.relname ");
3923  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3924  appendStringInfoString(&buf, "NOT ");
3925  appendStringInfoString(&buf, "IN (");
3926 
3927  /* Append list of table names within IN clause */
3928  foreach(lc, stmt->table_list)
3929  {
3930  RangeVar *rv = (RangeVar *) lfirst(lc);
3931 
3932  if (first_item)
3933  first_item = false;
3934  else
3935  appendStringInfoString(&buf, ", ");
3936  deparseStringLiteral(&buf, rv->relname);
3937  }
3938  appendStringInfoChar(&buf, ')');
3939  }
3940 
3941  /* Append ORDER BY at the end of query to ensure output ordering */
3942  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3943 
3944  /* Fetch the data */
3945  res = pgfdw_exec_query(conn, buf.data);
3946  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3947  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3948 
3949  /* Process results */
3950  numrows = PQntuples(res);
3951  /* note: incrementation of i happens in inner loop's while() test */
3952  for (i = 0; i < numrows;)
3953  {
3954  char *tablename = PQgetvalue(res, i, 0);
3955  bool first_item = true;
3956 
3957  resetStringInfo(&buf);
3958  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3959  quote_identifier(tablename));
3960 
3961  /* Scan all rows for this table */
3962  do
3963  {
3964  char *attname;
3965  char *typename;
3966  char *attnotnull;
3967  char *attdefault;
3968  char *collname;
3969  char *collnamespace;
3970 
3971  /* If table has no columns, we'll see nulls here */
3972  if (PQgetisnull(res, i, 1))
3973  continue;
3974 
3975  attname = PQgetvalue(res, i, 1);
3976  typename = PQgetvalue(res, i, 2);
3977  attnotnull = PQgetvalue(res, i, 3);
3978  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3979  PQgetvalue(res, i, 4);
3980  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
3981  PQgetvalue(res, i, 5);
3982  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
3983  PQgetvalue(res, i, 6);
3984 
3985  if (first_item)
3986  first_item = false;
3987  else
3988  appendStringInfoString(&buf, ",\n");
3989 
3990  /* Print column name and type */
3991  appendStringInfo(&buf, " %s %s",
3992  quote_identifier(attname),
3993  typename);
3994 
3995  /*
3996  * Add column_name option so that renaming the foreign table's
3997  * column doesn't break the association to the underlying
3998  * column.
3999  */
4000  appendStringInfoString(&buf, " OPTIONS (column_name ");
4001  deparseStringLiteral(&buf, attname);
4002  appendStringInfoChar(&buf, ')');
4003 
4004  /* Add COLLATE if needed */
4005  if (import_collate && collname != NULL && collnamespace != NULL)
4006  appendStringInfo(&buf, " COLLATE %s.%s",
4007  quote_identifier(collnamespace),
4008  quote_identifier(collname));
4009 
4010  /* Add DEFAULT if needed */
4011  if (import_default && attdefault != NULL)
4012  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4013 
4014  /* Add NOT NULL if needed */
4015  if (import_not_null && attnotnull[0] == 't')
4016  appendStringInfoString(&buf, " NOT NULL");
4017  }
4018  while (++i < numrows &&
4019  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4020 
4021  /*
4022  * Add server name and table-level options. We specify remote
4023  * schema and table name as options (the latter to ensure that
4024  * renaming the foreign table doesn't break the association).
4025  */
4026  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4027  quote_identifier(server->servername));
4028 
4029  appendStringInfoString(&buf, "schema_name ");
4030  deparseStringLiteral(&buf, stmt->remote_schema);
4031  appendStringInfoString(&buf, ", table_name ");
4032  deparseStringLiteral(&buf, tablename);
4033 
4034  appendStringInfoString(&buf, ");");
4035 
4036  commands = lappend(commands, pstrdup(buf.data));
4037  }
4038 
4039  /* Clean up */
4040  PQclear(res);
4041  res = NULL;
4042  }
4043  PG_CATCH();
4044  {
4045  if (res)
4046  PQclear(res);
4047  PG_RE_THROW();
4048  }
4049  PG_END_TRY();
4050 
4051  ReleaseConnection(conn);
4052 
4053  return commands;
4054 }
4055 
4056 /*
4057  * Assess whether the join between inner and outer relations can be pushed down
4058  * to the foreign server. As a side effect, save information we obtain in this
4059  * function to PgFdwRelationInfo passed in.
4060  */
4061 static bool
4063  RelOptInfo *outerrel, RelOptInfo *innerrel,
4064  JoinPathExtraData *extra)
4065 {
4066  PgFdwRelationInfo *fpinfo;
4067  PgFdwRelationInfo *fpinfo_o;
4068  PgFdwRelationInfo *fpinfo_i;
4069  ListCell *lc;
4070  List *joinclauses;
4071 
4072  /*
4073  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4074  * Constructing queries representing SEMI and ANTI joins is hard, hence
4075  * not considered right now.
4076  */
4077  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4078  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4079  return false;
4080 
4081  /*
4082  * If either of the joining relations is marked as unsafe to pushdown, the
4083  * join can not be pushed down.
4084  */
4085  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4086  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4087  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4088  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4089  !fpinfo_i || !fpinfo_i->pushdown_safe)
4090  return false;
4091 
4092  /*
4093  * If joining relations have local conditions, those conditions are
4094  * required to be applied before joining the relations. Hence the join can
4095  * not be pushed down.
4096  */
4097  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4098  return false;
4099 
4100  /*
4101  * Merge FDW options. We might be tempted to do this after we have deemed
4102  * the foreign join to be OK. But we must do this beforehand so that we
4103  * know which quals can be evaluated on the foreign server, which might
4104  * depend on shippable_extensions.
4105  */
4106  fpinfo->server = fpinfo_o->server;
4107  merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
4108 
4109  /*
4110  * Separate restrict list into join quals and pushed-down (other) quals.
4111  *
4112  * Join quals belonging to an outer join must all be shippable, else we
4113  * cannot execute the join remotely. Add such quals to 'joinclauses'.
4114  *
4115  * Add other quals to fpinfo->remote_conds if they are shippable, else to
4116  * fpinfo->local_conds. In an inner join it's okay to execute conditions
4117  * either locally or remotely; the same is true for pushed-down conditions
4118  * at an outer join.
4119  *
4120  * Note we might return failure after having already scribbled on
4121  * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
4122  * won't consult those lists again if we deem the join unshippable.
4123  */
4124  joinclauses = NIL;
4125  foreach(lc, extra->restrictlist)
4126  {
4127  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4128  bool is_remote_clause = is_foreign_expr(root, joinrel,
4129  rinfo->clause);
4130 
4131  if (IS_OUTER_JOIN(jointype) && !rinfo->is_pushed_down)
4132  {
4133  if (!is_remote_clause)
4134  return false;
4135  joinclauses = lappend(joinclauses, rinfo);
4136  }
4137  else
4138  {
4139  if (is_remote_clause)
4140  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4141  else
4142  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4143  }
4144  }
4145 
4146  /*
4147  * deparseExplicitTargetList() isn't smart enough to handle anything other
4148  * than a Var. In particular, if there's some PlaceHolderVar that would
4149  * need to be evaluated within this join tree (because there's an upper
4150  * reference to a quantity that may go to NULL as a result of an outer
4151  * join), then we can't try to push the join down because we'll fail when
4152  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4153  * needs to be evaluated *at the top* of this join tree is OK, because we
4154  * can do that locally after fetching the results from the remote side.
4155  */
4156  foreach(lc, root->placeholder_list)
4157  {
4158  PlaceHolderInfo *phinfo = lfirst(lc);
4159  Relids relids = joinrel->relids;
4160 
4161  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4162  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4163  return false;
4164  }
4165 
4166  /* Save the join clauses, for later use. */
4167  fpinfo->joinclauses = joinclauses;
4168 
4169  fpinfo->outerrel = outerrel;
4170  fpinfo->innerrel = innerrel;
4171  fpinfo->jointype = jointype;
4172 
4173  /*
4174  * By default, both the input relations are not required to be deparsed as
4175  * subqueries, but there might be some relations covered by the input
4176  * relations that are required to be deparsed as subqueries, so save the
4177  * relids of those relations for later use by the deparser.
4178  */
4179  fpinfo->make_outerrel_subquery = false;
4180  fpinfo->make_innerrel_subquery = false;
4181  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4182  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4184  fpinfo_i->lower_subquery_rels);
4185 
4186  /*
4187  * Pull the other remote conditions from the joining relations into join
4188  * clauses or other remote clauses (remote_conds) of this relation
4189  * wherever possible. This avoids building subqueries at every join step.
4190  *
4191  * For an inner join, clauses from both the relations are added to the
4192  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4193  * the outer side are added to remote_conds since those can be evaluated
4194  * after the join is evaluated. The clauses from inner side are added to
4195  * the joinclauses, since they need to be evaluated while constructing the
4196  * join.
4197  *
4198  * For a FULL OUTER JOIN, the other clauses from either relation can not
4199  * be added to the joinclauses or remote_conds, since each relation acts
4200  * as an outer relation for the other.
4201  *
4202  * The joining sides can not have local conditions, thus no need to test
4203  * shippability of the clauses being pulled up.
4204  */
4205  switch (jointype)
4206  {
4207  case JOIN_INNER:
4208  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4209  list_copy(fpinfo_i->remote_conds));
4210  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4211  list_copy(fpinfo_o->remote_conds));
4212  break;
4213 
4214  case JOIN_LEFT:
4215  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4216  list_copy(fpinfo_i->remote_conds));
4217  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4218  list_copy(fpinfo_o->remote_conds));
4219  break;
4220 
4221  case JOIN_RIGHT:
4222  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4223  list_copy(fpinfo_o->remote_conds));
4224  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4225  list_copy(fpinfo_i->remote_conds));
4226  break;
4227 
4228  case JOIN_FULL:
4229 
4230  /*
4231  * In this case, if any of the input relations has conditions, we
4232  * need to deparse that relation as a subquery so that the
4233  * conditions can be evaluated before the join. Remember it in
4234  * the fpinfo of this relation so that the deparser can take
4235  * appropriate action. Also, save the relids of base relations
4236  * covered by that relation for later use by the deparser.
4237  */
4238  if (fpinfo_o->remote_conds)
4239  {
4240  fpinfo->make_outerrel_subquery = true;
4241  fpinfo->lower_subquery_rels =
4243  outerrel->relids);
4244  }
4245  if (fpinfo_i->remote_conds)
4246  {
4247  fpinfo->make_innerrel_subquery = true;
4248  fpinfo->lower_subquery_rels =
4250  innerrel->relids);
4251  }
4252  break;
4253 
4254  default:
4255  /* Should not happen, we have just checked this above */
4256  elog(ERROR, "unsupported join type %d", jointype);
4257  }
4258 
4259  /*
4260  * For an inner join, all restrictions can be treated alike. Treating the
4261  * pushed down conditions as join conditions allows a top level full outer
4262  * join to be deparsed without requiring subqueries.
4263  */
4264  if (jointype == JOIN_INNER)
4265  {
4266  Assert(!fpinfo->joinclauses);
4267  fpinfo->joinclauses = fpinfo->remote_conds;
4268  fpinfo->remote_conds = NIL;
4269  }
4270 
4271  /* Mark that this join can be pushed down safely */
4272  fpinfo->pushdown_safe = true;
4273 
4274  /* Get user mapping */
4275  if (fpinfo->use_remote_estimate)
4276  {
4277  if (fpinfo_o->use_remote_estimate)
4278  fpinfo->user = fpinfo_o->user;
4279  else
4280  fpinfo->user = fpinfo_i->user;
4281  }
4282  else
4283  fpinfo->user = NULL;
4284 
4285  /*
4286  * Set cached relation costs to some negative value, so that we can detect
4287  * when they are set to some sensible costs, during one (usually the
4288  * first) of the calls to estimate_path_cost_size().
4289  */
4290  fpinfo->rel_startup_cost = -1;
4291  fpinfo->rel_total_cost = -1;
4292 
4293  /*
4294  * Set the string describing this join relation to be used in EXPLAIN
4295  * output of corresponding ForeignScan.
4296  */
4297  fpinfo->relation_name = makeStringInfo();
4298  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4299  fpinfo_o->relation_name->data,
4300  get_jointype_name(fpinfo->jointype),
4301  fpinfo_i->relation_name->data);
4302 
4303  /*
4304  * Set the relation index. This is defined as the position of this
4305  * joinrel in the join_rel_list list plus the length of the rtable list.
4306  * Note that since this joinrel is at the end of the join_rel_list list
4307  * when we are called, we can get the position by list_length.
4308  */
4309  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4310  fpinfo->relation_index =
4312 
4313  return true;
4314 }
4315 
4316 static void
4318  Path *epq_path)
4319 {
4320  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4321  ListCell *lc;
4322 
4323  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4324 
4325  /* Create one path for each set of pathkeys we found above. */
4326  foreach(lc, useful_pathkeys_list)
4327  {
4328  double rows;
4329  int width;
4330  Cost startup_cost;
4331  Cost total_cost;
4332  List *useful_pathkeys = lfirst(lc);
4333 
4334  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4335  &rows, &width, &startup_cost, &total_cost);
4336 
4337  add_path(rel, (Path *)
4338  create_foreignscan_path(root, rel,
4339  NULL,
4340  rows,
4341  startup_cost,
4342  total_cost,
4343  useful_pathkeys,
4344  NULL,
4345  epq_path,
4346  NIL));
4347  }
4348 }
4349 
4350 /*
4351  * Parse options from foreign server and apply them to fpinfo.
4352  *
4353  * New options might also require tweaking merge_fdw_options().
4354  */
4355 static void
4357 {
4358  ListCell *lc;
4359 
4360  foreach(lc, fpinfo->server->options)
4361  {
4362  DefElem *def = (DefElem *) lfirst(lc);
4363 
4364  if (strcmp(def->defname, "use_remote_estimate") == 0)
4365  fpinfo->use_remote_estimate = defGetBoolean(def);
4366  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
4367  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
4368  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
4369  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
4370  else if (strcmp(def->defname, "extensions") == 0)
4371  fpinfo->shippable_extensions =
4372  ExtractExtensionList(defGetString(def), false);
4373  else if (strcmp(def->defname, "fetch_size") == 0)
4374  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4375  }
4376 }
4377 
4378 /*
4379  * Parse options from foreign table and apply them to fpinfo.
4380  *
4381  * New options might also require tweaking merge_fdw_options().
4382  */
4383 static void
4385 {
4386  ListCell *lc;
4387 
4388  foreach(lc, fpinfo->table->options)
4389  {
4390  DefElem *def = (DefElem *) lfirst(lc);
4391 
4392  if (strcmp(def->defname, "use_remote_estimate") == 0)
4393  fpinfo->use_remote_estimate = defGetBoolean(def);
4394  else if (strcmp(def->defname, "fetch_size") == 0)
4395  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
4396  }
4397 }
4398 
4399 /*
4400  * Merge FDW options from input relations into a new set of options for a join
4401  * or an upper rel.
4402  *
4403  * For a join relation, FDW-specific information about the inner and outer
4404  * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
4405  * fpinfo_o provides the information for the input relation; fpinfo_i is
4406  * expected to NULL.
4407  */
4408 static void
4410  const PgFdwRelationInfo *fpinfo_o,
4411  const PgFdwRelationInfo *fpinfo_i)
4412 {
4413  /* We must always have fpinfo_o. */
4414  Assert(fpinfo_o);
4415 
4416  /* fpinfo_i may be NULL, but if present the servers must both match. */
4417  Assert(!fpinfo_i ||
4418  fpinfo_i->server->serverid == fpinfo_o->server->serverid);
4419 
4420  /*
4421  * Copy the server specific FDW options. (For a join, both relations come
4422  * from the same server, so the server options should have the same value
4423  * for both relations.)
4424  */
4425  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4426  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4427  fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
4428  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
4429  fpinfo->fetch_size = fpinfo_o->fetch_size;
4430 
4431  /* Merge the table level options from either side of the join. */
4432  if (fpinfo_i)
4433  {
4434  /*
4435  * We'll prefer to use remote estimates for this join if any table
4436  * from either side of the join is using remote estimates. This is
4437  * most likely going to be preferred since they're already willing to
4438  * pay the price of a round trip to get the remote EXPLAIN. In any
4439  * case it's not entirely clear how we might otherwise handle this
4440  * best.
4441  */
4442  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4443  fpinfo_i->use_remote_estimate;
4444 
4445  /*
4446  * Set fetch size to maximum of the joining sides, since we are
4447  * expecting the rows returned by the join to be proportional to the
4448  * relation sizes.
4449  */
4450  fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
4451  }
4452 }
4453 
4454 /*
4455  * postgresGetForeignJoinPaths
4456  * Add possible ForeignPath to joinrel, if join is safe to push down.
4457  */
4458 static void
4460  RelOptInfo *joinrel,
4461  RelOptInfo *outerrel,
4462  RelOptInfo *innerrel,
4463  JoinType jointype,
4464  JoinPathExtraData *extra)
4465 {
4466  PgFdwRelationInfo *fpinfo;
4467  ForeignPath *joinpath;
4468  double rows;
4469  int width;
4470  Cost startup_cost;
4471  Cost total_cost;
4472  Path *epq_path; /* Path to create plan to be executed when
4473  * EvalPlanQual gets triggered. */
4474 
4475  /*
4476  * Skip if this join combination has been considered already.
4477  */
4478  if (joinrel->fdw_private)
4479  return;
4480 
4481  /*
4482  * Create unfinished PgFdwRelationInfo entry which is used to indicate
4483  * that the join relation is already considered, so that we won't waste
4484  * time in judging safety of join pushdown and adding the same paths again
4485  * if found safe. Once we know that this join can be pushed down, we fill
4486  * the entry.
4487  */
4488  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4489  fpinfo->pushdown_safe = false;
4490  joinrel->fdw_private = fpinfo;
4491  /* attrs_used is only for base relations. */
4492  fpinfo->attrs_used = NULL;
4493 
4494  /*
4495  * If there is a possibility that EvalPlanQual will be executed, we need
4496  * to be able to reconstruct the row using scans of the base relations.
4497  * GetExistingLocalJoinPath will find a suitable path for this purpose in
4498  * the path list of the joinrel, if one exists. We must be careful to
4499  * call it before adding any ForeignPath, since the ForeignPath might
4500  * dominate the only suitable local path available. We also do it before
4501  * calling foreign_join_ok(), since that function updates fpinfo and marks
4502  * it as pushable if the join is found to be pushable.
4503  */
4504  if (root->parse->commandType == CMD_DELETE ||
4505  root->parse->commandType == CMD_UPDATE ||
4506  root->rowMarks)
4507  {
4508  epq_path = GetExistingLocalJoinPath(joinrel);
4509  if (!epq_path)
4510  {
4511  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4512  return;
4513  }
4514  }
4515  else
4516  epq_path = NULL;
4517 
4518  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4519  {
4520  /* Free path required for EPQ if we copied one; we don't need it now */
4521  if (epq_path)
4522  pfree(epq_path);
4523  return;
4524  }
4525 
4526  /*
4527  * Compute the selectivity and cost of the local_conds, so we don't have
4528  * to do it over again for each path. The best we can do for these
4529  * conditions is to estimate selectivity on the basis of local statistics.
4530  * The local conditions are applied after the join has been computed on
4531  * the remote side like quals in WHERE clause, so pass jointype as
4532  * JOIN_INNER.
4533  */
4534  fpinfo->local_conds_sel = clauselist_selectivity(root,
4535  fpinfo->local_conds,
4536  0,
4537  JOIN_INNER,
4538  NULL);
4539  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4540 
4541  /*
4542  * If we are going to estimate costs locally, estimate the join clause
4543  * selectivity here while we have special join info.
4544  */
4545  if (!fpinfo->use_remote_estimate)
4546  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4547  0, fpinfo->jointype,
4548  extra->sjinfo);
4549 
4550  /* Estimate costs for bare join relation */
4551  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4552  &width, &startup_cost, &total_cost);
4553  /* Now update this information in the joinrel */
4554  joinrel->rows = rows;
4555  joinrel->reltarget->width = width;
4556  fpinfo->rows = rows;
4557  fpinfo->width = width;
4558  fpinfo->startup_cost = startup_cost;
4559  fpinfo->total_cost = total_cost;
4560 
4561  /*
4562  * Create a new join path and add it to the joinrel which represents a
4563  * join between foreign tables.
4564  */
4565  joinpath = create_foreignscan_path(root,
4566  joinrel,
4567  NULL, /* default pathtarget */
4568  rows,
4569  startup_cost,
4570  total_cost,
4571  NIL, /* no pathkeys */
4572  NULL, /* no required_outer */
4573  epq_path,
4574  NIL); /* no fdw_private */
4575 
4576  /* Add generated path into joinrel by add_path(). */
4577  add_path(joinrel, (Path *) joinpath);
4578 
4579  /* Consider pathkeys for the join relation */
4580  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4581 
4582  /* XXX Consider parameterized paths for the join relation */
4583 }
4584 
4585 /*
4586  * Assess whether the aggregation, grouping and having operations can be pushed
4587  * down to the foreign server. As a side effect, save information we obtain in
4588  * this function to PgFdwRelationInfo of the input relation.
4589  */
4590 static bool
4592 {
4593  Query *query = root->parse;
4594  PathTarget *grouping_target;
4595  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
4596  PgFdwRelationInfo *ofpinfo;
4597  List *aggvars;
4598  ListCell *lc;
4599  int i;
4600  List *tlist = NIL;
4601 
4602  /* Grouping Sets are not pushable */
4603  if (query->groupingSets)
4604  return false;
4605 
4606  /* Get the fpinfo of the underlying scan relation. */
4607  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
4608 
4609  /*
4610  * If underneath input relation has any local conditions, those conditions
4611  * are required to be applied before performing aggregation. Hence the
4612  * aggregate cannot be pushed down.
4613  */
4614  if (ofpinfo->local_conds)
4615  return false;
4616 
4617  /*
4618  * The targetlist expected from this node and the targetlist pushed down
4619  * to the foreign server may be different. The latter requires
4620  * sortgrouprefs to be set to push down GROUP BY clause, but should not
4621  * have those arising from ORDER BY clause. These sortgrouprefs may be
4622  * different from those in the plan's targetlist. Use a copy of path
4623  * target to record the new sortgrouprefs.
4624  */
4625  grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]);
4626 
4627  /*
4628  * Evaluate grouping targets and check whether they are safe to push down
4629  * to the foreign side. All GROUP BY expressions will be part of the
4630  * grouping target and thus there is no need to evaluate it separately.
4631  * While doing so, add required expressions into target list which can
4632  * then be used to pass to foreign server.
4633  */
4634  i = 0;
4635  foreach(lc, grouping_target->exprs)
4636  {
4637  Expr *expr = (Expr *) lfirst(lc);
4638  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
4639  ListCell *l;
4640 
4641  /* Check whether this expression is part of GROUP BY clause */
4642  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
4643  {
4644  /*
4645  * If any of the GROUP BY expression is not shippable we can not
4646  * push down aggregation to the foreign server.
4647  */
4648  if (!is_foreign_expr(root, grouped_rel, expr))
4649  return false;
4650 
4651  /* Pushable, add to tlist */
4652  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4653  }
4654  else
4655  {
4656  /* Check entire expression whether it is pushable or not */
4657  if (is_foreign_expr(root, grouped_rel, expr))
4658  {
4659  /* Pushable, add to tlist */
4660  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4661  }
4662  else
4663  {
4664  /*
4665  * If we have sortgroupref set, then it means that we have an
4666  * ORDER BY entry pointing to this expression. Since we are
4667  * not pushing ORDER BY with GROUP BY, clear it.
4668  */
4669  if (sgref)
4670  grouping_target->sortgrouprefs[i] = 0;
4671 
4672  /* Not matched exactly, pull the var with aggregates then */
4673  aggvars = pull_var_clause((Node *) expr,
4675 
4676  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
4677  return false;
4678 
4679  /*
4680  * Add aggregates, if any, into the targetlist. Plain var
4681  * nodes should be either same as some GROUP BY expression or
4682  * part of some GROUP BY expression. In later case, the query
4683  * cannot refer plain var nodes without the surrounding
4684  * expression. In both the cases, they are already part of
4685  * the targetlist and thus no need to add them again. In fact
4686  * adding pulled plain var nodes in SELECT clause will cause
4687  * an error on the foreign server if they are not same as some
4688  * GROUP BY expression.
4689  */
4690  foreach(l, aggvars)
4691  {
4692  Expr *expr = (Expr *) lfirst(l);
4693 
4694  if (IsA(expr, Aggref))
4695  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4696  }
4697  }
4698  }
4699 
4700  i++;
4701  }
4702 
4703  /*
4704  * Classify the pushable and non-pushable having clauses and save them in
4705  * remote_conds and local_conds of the grouped rel's fpinfo.
4706  */
4707  if (root->hasHavingQual && query->havingQual)
4708  {
4709  ListCell *lc;
4710 
4711  foreach(lc, (List *) query->havingQual)
4712  {
4713  Expr *expr = (Expr *) lfirst(lc);
4714  RestrictInfo *rinfo;
4715 
4716  /*
4717  * Currently, the core code doesn't wrap havingQuals in
4718  * RestrictInfos, so we must make our own.
4719  */
4720  Assert(!IsA(expr, RestrictInfo));
4721  rinfo = make_restrictinfo(expr,
4722  true,
4723  false,
4724  false,
4725  root->qual_security_level,
4726  grouped_rel->relids,
4727  NULL,
4728  NULL);
4729  if (is_foreign_expr(root, grouped_rel, expr))
4730  fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
4731  else
4732  fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
4733  }
4734  }
4735 
4736  /*
4737  * If there are any local conditions, pull Vars and aggregates from it and
4738  * check whether they are safe to pushdown or not.
4739  */
4740  if (fpinfo->local_conds)
4741  {
4742  List *aggvars = NIL;
4743  ListCell *lc;
4744 
4745  foreach(lc, fpinfo->local_conds)
4746  {
4747  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
4748 
4749  aggvars = list_concat(aggvars,
4750  pull_var_clause((Node *) rinfo->clause,
4752  }
4753 
4754  foreach(lc, aggvars)
4755  {
4756  Expr *expr = (Expr *) lfirst(lc);
4757 
4758  /*
4759  * If aggregates within local conditions are not safe to push
4760  * down, then we cannot push down the query. Vars are already
4761  * part of GROUP BY clause which are checked above, so no need to
4762  * access them again here.
4763  */
4764  if (IsA(expr, Aggref))
4765  {
4766  if (!is_foreign_expr(root, grouped_rel, expr))
4767  return false;
4768 
4769  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4770  }
4771  }
4772  }
4773 
4774  /* Transfer any sortgroupref data to the replacement tlist */
4775  apply_pathtarget_labeling_to_tlist(tlist, grouping_target);
4776 
4777  /* Store generated targetlist */
4778  fpinfo->grouped_tlist = tlist;
4779 
4780  /* Safe to pushdown */
4781  fpinfo->pushdown_safe = true;
4782 
4783  /*
4784  * Set cached relation costs to some negative value, so that we can detect
4785  * when they are set to some sensible costs, during one (usually the
4786  * first) of the calls to estimate_path_cost_size().
4787  */
4788  fpinfo->rel_startup_cost = -1;
4789  fpinfo->rel_total_cost = -1;
4790 
4791  /*
4792  * Set the string describing this grouped relation to be used in EXPLAIN
4793  * output of corresponding ForeignScan.
4794  */
4795  fpinfo->relation_name = makeStringInfo();
4796  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
4797  ofpinfo->relation_name->data);
4798 
4799  return true;
4800 }
4801 
4802 /*
4803  * postgresGetForeignUpperPaths
4804  * Add paths for post-join operations like aggregation, grouping etc. if
4805  * corresponding operations are safe to push down.
4806  *
4807  * Right now, we only support aggregate, grouping and having clause pushdown.
4808  */
4809 static void
4811  RelOptInfo *input_rel, RelOptInfo *output_rel)
4812 {
4813  PgFdwRelationInfo *fpinfo;
4814 
4815  /*
4816  * If input rel is not safe to pushdown, then simply return as we cannot
4817  * perform any post-join operations on the foreign server.
4818  */
4819  if (!input_rel->fdw_private ||
4820  !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
4821  return;
4822 
4823  /* Ignore stages we don't support; and skip any duplicate calls. */
4824  if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
4825  return;
4826 
4827  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4828  fpinfo->pushdown_safe = false;
4829  output_rel->fdw_private = fpinfo;
4830 
4831  add_foreign_grouping_paths(root, input_rel, output_rel);
4832 }
4833 
4834 /*
4835  * add_foreign_grouping_paths
4836  * Add foreign path for grouping and/or aggregation.
4837  *
4838  * Given input_rel represents the underlying scan. The paths are added to the
4839  * given grouped_rel.
4840  */
4841 static void
4843  RelOptInfo *grouped_rel)
4844 {
4845  Query *parse = root->parse;
4846  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
4847  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
4848  ForeignPath *grouppath;
4849  PathTarget *grouping_target;
4850  double rows;
4851  int width;
4852  Cost startup_cost;
4853  Cost total_cost;
4854 
4855  /* Nothing to be done, if there is no grouping or aggregation required. */
4856  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
4857  !root->hasHavingQual)
4858  return;
4859 
4860  grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4861 
4862  /* save the input_rel as outerrel in fpinfo */
4863  fpinfo->outerrel = input_rel;
4864 
4865  /*
4866  * Copy foreign table, foreign server, user mapping, FDW options etc.
4867  * details from the input relation's fpinfo.
4868  */
4869  fpinfo->table = ifpinfo->table;
4870  fpinfo->server = ifpinfo->server;
4871  fpinfo->user = ifpinfo->user;
4872  merge_fdw_options(fpinfo, ifpinfo, NULL);
4873 
4874  /* Assess if it is safe to push down aggregation and grouping. */
4875  if (!foreign_grouping_ok(root, grouped_rel))
4876  return;
4877 
4878  /* Estimate the cost of push down */
4879  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
4880  &width, &startup_cost, &total_cost);
4881 
4882  /* Now update this information in the fpinfo */
4883  fpinfo->rows = rows;
4884  fpinfo->width = width;
4885  fpinfo->startup_cost = startup_cost;
4886  fpinfo->total_cost = total_cost;
4887 
4888  /* Create and add foreign path to the grouping relation. */
4889  grouppath = create_foreignscan_path(root,
4890  grouped_rel,
4891  grouping_target,
4892  rows,
4893  startup_cost,
4894  total_cost,
4895  NIL, /* no pathkeys */
4896  NULL, /* no required_outer */
4897  NULL,
4898  NIL); /* no fdw_private */
4899 
4900  /* Add generated path into grouped_rel by add_path(). */
4901  add_path(grouped_rel, (Path *) grouppath);
4902 }
4903 
4904 /*
4905  * Create a tuple from the specified row of the PGresult.
4906  *
4907  * rel is the local representation of the foreign table, attinmeta is
4908  * conversion data for the rel's tupdesc, and retrieved_attrs is an
4909  * integer list of the table column numbers present in the PGresult.
4910  * temp_context is a working context that can be reset after each tuple.
4911  */
4912 static HeapTuple
4914  int row,
4915  Relation rel,
4918  ForeignScanState *fsstate,
4919  MemoryContext temp_context)
4920 {
4921  HeapTuple tuple;
4923  Datum *values;
4924  bool *nulls;
4925  ItemPointer ctid = NULL;
4926  Oid oid = InvalidOid;
4927  ConversionLocation errpos;
4928  ErrorContextCallback errcallback;
4929  MemoryContext oldcontext;
4930  ListCell *lc;
4931  int j;
4932 
4933  Assert(row < PQntuples(res));
4934 
4935  /*
4936  * Do the following work in a temp context that we reset after each tuple.
4937  * This cleans up not only the data we have direct access to, but any
4938  * cruft the I/O functions might leak.
4939  */
4940  oldcontext = MemoryContextSwitchTo(temp_context);
4941 
4942  if (rel)
4943  tupdesc = RelationGetDescr(rel);
4944  else
4945  {
4946  PgFdwScanState *fdw_sstate;
4947 
4948  Assert(fsstate);
4949  fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4950  tupdesc = fdw_sstate->tupdesc;
4951  }
4952 
4953  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4954  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4955  /* Initialize to nulls for any columns not present in result */
4956  memset(nulls, true, tupdesc->natts * sizeof(bool));
4957 
4958  /*
4959  * Set up and install callback to report where conversion error occurs.
4960  */
4961  errpos.rel = rel;
4962  errpos.cur_attno = 0;
4963  errpos.fsstate = fsstate;
4964  errcallback.callback = conversion_error_callback;
4965  errcallback.arg = (void *) &errpos;
4966  errcallback.previous = error_context_stack;
4967  error_context_stack = &errcallback;
4968 
4969  /*
4970  * i indexes columns in the relation, j indexes columns in the PGresult.
4971  */
4972  j = 0;
4973  foreach(lc, retrieved_attrs)
4974  {
4975  int i = lfirst_int(lc);
4976  char *valstr;
4977 
4978  /* fetch next column's textual value */
4979  if (PQgetisnull(res, row, j))
4980  valstr = NULL;
4981  else
4982  valstr = PQgetvalue(res, row, j);
4983 
4984  /*
4985  * convert value to internal representation
4986  *
4987  * Note: we ignore system columns other than ctid and oid in result
4988  */
4989  errpos.cur_attno = i;
4990  if (i > 0)
4991  {
4992  /* ordinary column */
4993  Assert(i <= tupdesc->natts);
4994  nulls[i - 1] = (valstr == NULL);
4995  /* Apply the input function even to nulls, to support domains */
4996  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4997  valstr,
4998  attinmeta->attioparams[i - 1],
4999  attinmeta->atttypmods[i - 1]);
5000  }
5001  else if (i == SelfItemPointerAttributeNumber)
5002  {
5003  /* ctid */
5004  if (valstr != NULL)
5005  {
5006  Datum datum;
5007 
5008  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
5009  ctid = (ItemPointer) DatumGetPointer(datum);
5010  }
5011  }
5012  else if (i == ObjectIdAttributeNumber)
5013  {
5014  /* oid */
5015  if (valstr != NULL)
5016  {
5017  Datum datum;
5018 
5019  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
5020  oid = DatumGetObjectId(datum);
5021  }
5022  }
5023  errpos.cur_attno = 0;
5024 
5025  j++;
5026  }
5027 
5028  /* Uninstall error context callback. */
5029  error_context_stack = errcallback.previous;
5030 
5031  /*
5032  * Check we got the expected number of columns. Note: j == 0 and
5033  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
5034  */
5035  if (j > 0 && j != PQnfields(res))
5036  elog(ERROR, "remote query result does not match the foreign table");
5037 
5038  /*
5039  * Build the result tuple in caller's memory context.
5040  */
5041  MemoryContextSwitchTo(oldcontext);
5042 
5043  tuple = heap_form_tuple(tupdesc, values, nulls);
5044 
5045  /*
5046  * If we have a CTID to return, install it in both t_self and t_ctid.
5047  * t_self is the normal place, but if the tuple is converted to a
5048  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
5049  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
5050  */
5051  if (ctid)
5052  tuple->t_self = tuple->t_data->t_ctid = *ctid;
5053 
5054  /*
5055  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
5056  * heap_form_tuple. heap_form_tuple actually creates the tuple with
5057  * DatumTupleFields, not HeapTupleFields, but the executor expects
5058  * HeapTupleFields and will happily extract system columns on that
5059  * assumption. If we don't do this then, for example, the tuple length
5060  * ends up in the xmin field, which isn't what we want.
5061  */
5065 
5066  /*
5067  * If we have an OID to return, install it.
5068  */
5069  if (OidIsValid(oid))
5070  HeapTupleSetOid(tuple, oid);
5071 
5072  /* Clean up */
5073  MemoryContextReset(temp_context);
5074 
5075  return tuple;
5076 }
5077 
5078 /*
5079  * Callback function which is called when error occurs during column value
5080  * conversion. Print names of column and relation.
5081  */
5082 static void
5084 {
5085  const char *attname = NULL;
5086  const char *relname = NULL;
5087  bool is_wholerow = false;
5088  ConversionLocation *errpos = (ConversionLocation *) arg;
5089 
5090  if (errpos->rel)
5091  {
5092  /* error occurred in a scan against a foreign table */
5094  Form_pg_attribute attr = TupleDescAttr(tupdesc, errpos->cur_attno - 1);
5095 
5096  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5097  attname = NameStr(attr->attname);
5098  else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5099  attname = "ctid";
5100  else if (errpos->cur_attno == ObjectIdAttributeNumber)
5101  attname = "oid";
5102 
5103  relname = RelationGetRelationName(errpos->rel);
5104  }
5105  else
5106  {
5107  /* error occurred in a scan against a foreign join */
5108  ForeignScanState *fsstate = errpos->fsstate;
5109  ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5110  EState *estate = fsstate->ss.ps.state;
5111  TargetEntry *tle;
5112 
5113  tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
5114  errpos->cur_attno - 1);
5115 
5116  /*
5117  * Target list can have Vars and expressions. For Vars, we can get
5118  * it's relation, however for expressions we can't. Thus for
5119  * expressions, just show generic context message.
5120  */
5121  if (IsA(tle->expr, Var))
5122  {
5123  RangeTblEntry *rte;
5124  Var *var = (Var *) tle->expr;
5125 
5126  rte = rt_fetch(var->varno, estate->es_range_table);
5127 
5128  if (var->varattno == 0)
5129  is_wholerow = true;
5130  else
5131  attname = get_relid_attribute_name(rte->relid, var->varattno);
5132 
5133  relname = get_rel_name(rte->relid);
5134  }
5135  else
5136  errcontext("processing expression at position %d in select list",
5137  errpos->cur_attno);
5138  }
5139 
5140  if (relname)
5141  {
5142  if (is_wholerow)
5143  errcontext("whole-row reference to foreign table \"%s\"", relname);
5144  else if (attname)
5145  errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5146  }
5147 }
5148 
5149 /*
5150  * Find an equivalence class member expression, all of whose Vars, come from
5151  * the indicated relation.
5152  */
5153 extern Expr *
5155 {
5156  ListCell *lc_em;
5157 
5158  foreach(lc_em, ec->ec_members)
5159  {
5160  EquivalenceMember *em = lfirst(lc_em);
5161 
5162  if (bms_is_subset(em->em_relids, rel->relids))
5163  {
5164  /*
5165  * If there is more than one equivalence member whose Vars are
5166  * taken entirely from this relation, we'll be content to choose
5167  * any one of those.
5168  */
5169  return em->em_expr;
5170  }
5171  }
5172 
5173  /* We didn't find any suitable equivalence class expression */
5174  return NULL;
5175 }
GetForeignPlan_function GetForeignPlan
Definition: fdwapi.h:182
bool has_eclass_joins
Definition: relation.h:651
Value * makeString(char *str)
Definition: value.c:53
#define list_make3(x1, x2, x3)
Definition: pg_list.h:141
BeginForeignScan_function BeginForeignScan
Definition: fdwapi.h:183
GetForeignUpperPaths_function GetForeignUpperPaths
Definition: fdwapi.h:197
ExecForeignDelete_function ExecForeignDelete
Definition: fdwapi.h:205
#define PG_RETURN_POINTER(x)
Definition: fmgr.h:321
EndDirectModify_function EndDirectModify
Definition: fdwapi.h:211
#define NIL
Definition: pg_list.h:69
PathTarget * copy_pathtarget(PathTarget *src)
Definition: tlist.c:629
List * rowMarks
Definition: relation.h:256
ScanState ss
Definition: execnodes.h:1554
void deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1611
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
Datum postgres_fdw_handler(PG_FUNCTION_ARGS)
Definition: postgres_fdw.c:429
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:778
Definition: fmgr.h:56
List * qual
Definition: plannodes.h:145
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2732
double estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows, List **pgset)
Definition: selfuncs.c:3360
#define SizeofHeapTupleHeader
Definition: htup_details.h:175
FdwModifyPrivateIndex
Definition: postgres_fdw.c:89
Relation ri_RelationDesc
Definition: execnodes.h:354
#define IsA(nodeptr, _type_)
Definition: nodes.h:561
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
Query * parse
Definition: relation.h:155
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2646
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:420
Relids ph_eval_at
Definition: relation.h:2154
static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
ParamPathInfo * get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, Relids required_outer)
Definition: relnode.c:1253
ExplainForeignScan_function ExplainForeignScan
Definition: fdwapi.h:219
RestrictInfo * make_restrictinfo(Expr *clause, bool is_pushed_down, bool outerjoin_delayed, bool pseudoconstant, Index security_level, Relids required_relids, Relids outer_relids, Relids nullable_relids)
Definition: restrictinfo.c:57
Index scanrelid
Definition: plannodes.h:329
static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
List * query_pathkeys
Definition: relation.h:262
Instrumentation * instrument
Definition: execnodes.h:859
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1234
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10406
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3118
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path)
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:512
HeapTuple * rows
Definition: postgres_fdw.c:228
AttrNumber ExecFindJunkAttributeInTlist(List *targetlist, const char *attrName)
Definition: execJunk.c:221
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2974
#define RelationGetDescr(relation)
Definition: rel.h:437
#define DEBUG3
Definition: elog.h:23
Oid GetUserId(void)
Definition: miscinit.c:284
#define ObjectIdAttributeNumber
Definition: sysattr.h:22
AnalyzeForeignTable_function AnalyzeForeignTable
Definition: fdwapi.h:224
#define castNode(_type_, nodeptr)
Definition: nodes.h:579
static void create_cursor(ForeignScanState *node)
static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel)
const char ** param_values
Definition: postgres_fdw.c:207
static void apply_server_options(PgFdwRelationInfo *fpinfo)
#define PointerGetDatum(X)
Definition: postgres.h:562
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3065
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:90
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
List * param_exprs
Definition: postgres_fdw.c:142
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:203
bool eclass_useful_for_merging(PlannerInfo *root, EquivalenceClass *eclass, RelOptInfo *rel)
Definition: equivclass.c:2436
#define DatumGetObjectId(X)
Definition: postgres.h:506
char * pstrdup(const char *in)
Definition: mcxt.c:1076
ExprContext * ps_ExprContext
Definition: execnodes.h:883
static void postgresBeginForeignScan(ForeignScanState *node, int eflags)
double tuples
Definition: relation.h:625
List * baserestrictinfo
Definition: relation.h:645
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:175
List * fdw_exprs
Definition: plannodes.h:600
void get_agg_clause_costs(PlannerInfo *root, Node *clause, AggSplit aggsplit, AggClauseCosts *costs)
Definition: clauses.c:467
PG_FUNCTION_INFO_V1(postgres_fdw_handler)
bool hasAggs
Definition: parsenodes.h:123
Relids clause_relids
Definition: relation.h:1850
AttInMetadata * attinmeta
Definition: postgres_fdw.c:194
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
#define Min(x, y)
Definition: c.h:802
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:937
#define IS_UPPER_REL(rel)
Definition: relation.h:571
ForeignServer * server
Definition: postgres_fdw.h:76
bool pseudoconstant
Definition: relation.h:1843
int resultRelation
Definition: parsenodes.h:120
List * fdw_private
Definition: plannodes.h:601
static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:682
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
#define IS_JOIN_REL(rel)
Definition: relation.h:566
#define RELKIND_MATVIEW
Definition: pg_class.h:165
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IS_OUTER_JOIN(jointype)
Definition: nodes.h:723
double sampler_random_fract(SamplerRandomState randstate)
Definition: sampling.c:238
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
List * groupingSets
Definition: parsenodes.h:148
#define InvalidBuffer
Definition: buf.h:25
void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds)
Definition: deparse.c:200
int set_transmission_modes(void)
List * list_copy(const List *oldlist)
Definition: list.c:1160
Definition: nodes.h:510
#define strVal(v)
Definition: value.h:54
Relids lower_subquery_rels
Definition: postgres_fdw.h:103
int errcode(int sqlerrcode)
Definition: elog.c:575
#define IS_OTHER_REL(rel)
Definition: relation.h:574
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
int IntervalStyle
Definition: globals.c:109
#define MemSet(start, val, len)
Definition: c.h:853
AttrNumber varattno
Definition: primnodes.h:168
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
List * join_rel_list
Definition: relation.h:215
bool canSetTag
Definition: plannodes.h:218
CmdType operation
Definition: execnodes.h:961
List * list_concat(List *list1, List *list2)
Definition: list.c:321
int32 * atttypmods
Definition: funcapi.h:48
PathKey * make_canonical_pathkey(PlannerInfo *root, EquivalenceClass *eclass, Oid opfamily, int strategy, bool nulls_first)
Definition: pathkeys.c:51
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1106
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
uint32 BlockNumber
Definition: block.h:31
EquivalenceClass * right_ec
Definition: relation.h:1884
List * retrieved_attrs
Definition: postgres_fdw.c:134
void reservoir_init_selection_state(ReservoirState rs, int n)
Definition: sampling.c:129
List * pull_var_clause(Node *node, int flags)
Definition: var.c:535
List * fdw_scan_tlist
Definition: plannodes.h:602
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
double Selectivity
Definition: nodes.h:640
Relation ss_currentRelation
Definition: execnodes.h:1104
EState * state
Definition: execnodes.h:851
QualCost transCost
Definition: relation.h:62
List * es_range_table
Definition: execnodes.h:431
static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:486
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6096
static int postgresIsForeignRelUpdatable(Relation rel)
UpperRelationKind
Definition: relation.h:71
Definition: primnodes.h:163
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2724
static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
void(* callback)(void *arg)
Definition: elog.h:239
struct ErrorContextCallback * previous
Definition: elog.h:238
#define OidIsValid(objectId)
Definition: c.h:576
Oid * attioparams
Definition: funcapi.h:45
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
List * retrieved_attrs
Definition: postgres_fdw.c:177
List * mergeopfamilies
Definition: relation.h:1880
static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
int natts
Definition: tupdesc.h:79
CmdType operation
Definition: plannodes.h:598
List * plans
Definition: plannodes.h:225
static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
RelOptInfo * outerrel
Definition: postgres_fdw.h:89
#define IS_SIMPLE_REL(rel)
Definition: relation.h:561
static void close_cursor(PGconn *conn, unsigned int cursor_number)
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2647
Cost startup
Definition: relation.h:45
void pull_varattnos(Node *node, Index varno, Bitmapset **varattnos)
Definition: var.c:219
AddForeignUpdateTargets_function AddForeignUpdateTargets
Definition: fdwapi.h:200
Index ri_RangeTableIndex
Definition: execnodes.h:351
static void postgresEndDirectModify(ForeignScanState *node)
#define PVC_INCLUDE_AGGREGATES
Definition: var.h:20
#define USE_ISO_DATES
Definition: miscadmin.h:211
JoinType
Definition: nodes.h:674
List * targetList
Definition: parsenodes.h:138
void deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1548
unsigned int cursor_number
Definition: postgres_fdw.c:138
struct RelOptInfo ** simple_rel_array
Definition: relation.h:179
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1662
static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i)
ItemPointerData * ItemPointer
Definition: itemptr.h:49
HeapTupleHeader t_data
Definition: htup.h:67
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:703
ErrorContextCallback * error_context_stack
Definition: elog.c:88
void ReleaseConnection(PGconn *conn)
Definition: connection.c:460
RecheckForeignScan_function RecheckForeignScan
Definition: fdwapi.h:216
IterateDirectModify_function IterateDirectModify
Definition: fdwapi.h:210
#define list_make1(x1)
Definition: pg_list.h:139