PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "commands/defrem.h"
20 #include "commands/explain.h"
21 #include "commands/vacuum.h"
22 #include "foreign/fdwapi.h"
23 #include "funcapi.h"
24 #include "miscadmin.h"
25 #include "nodes/makefuncs.h"
26 #include "nodes/nodeFuncs.h"
27 #include "optimizer/cost.h"
28 #include "optimizer/clauses.h"
29 #include "optimizer/pathnode.h"
30 #include "optimizer/paths.h"
31 #include "optimizer/planmain.h"
32 #include "optimizer/restrictinfo.h"
33 #include "optimizer/var.h"
34 #include "optimizer/tlist.h"
35 #include "parser/parsetree.h"
36 #include "utils/builtins.h"
37 #include "utils/guc.h"
38 #include "utils/lsyscache.h"
39 #include "utils/memutils.h"
40 #include "utils/rel.h"
41 #include "utils/sampling.h"
42 #include "utils/selfuncs.h"
43 
45 
46 /* Default CPU cost to start up a foreign query. */
47 #define DEFAULT_FDW_STARTUP_COST 100.0
48 
49 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
50 #define DEFAULT_FDW_TUPLE_COST 0.01
51 
52 /* If no remote estimates, assume a sort costs 20% extra */
53 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
54 
55 /*
56  * Indexes of FDW-private information stored in fdw_private lists.
57  *
58  * These items are indexed with the enum FdwScanPrivateIndex, so an item
59  * can be fetched with list_nth(). For example, to get the SELECT statement:
60  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
61  */
63 {
64  /* SQL statement to execute remotely (as a String node) */
66  /* List of restriction clauses that can be executed remotely */
68  /* Integer list of attribute numbers retrieved by the SELECT */
70  /* Integer representing the desired fetch_size */
72 
73  /*
74  * String describing join i.e. names of relations being joined and types
75  * of join, added when the scan is join
76  */
78 };
79 
80 /*
81  * Similarly, this enum describes what's kept in the fdw_private list for
82  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
83  *
84  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
85  * 2) Integer list of target attribute numbers for INSERT/UPDATE
86  * (NIL for a DELETE)
87  * 3) Boolean flag showing if the remote query has a RETURNING clause
88  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
89  */
91 {
92  /* SQL statement to execute remotely (as a String node) */
94  /* Integer list of target attribute numbers for INSERT/UPDATE */
96  /* has-returning flag (as an integer Value node) */
98  /* Integer list of attribute numbers retrieved by RETURNING */
100 };
101 
102 /*
103  * Similarly, this enum describes what's kept in the fdw_private list for
104  * a ForeignScan node that modifies a foreign table directly. We store:
105  *
106  * 1) UPDATE/DELETE statement text to be sent to the remote server
107  * 2) Boolean flag showing if the remote query has a RETURNING clause
108  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
109  * 4) Boolean flag showing if we set the command es_processed
110  */
112 {
113  /* SQL statement to execute remotely (as a String node) */
115  /* has-returning flag (as an integer Value node) */
117  /* Integer list of attribute numbers retrieved by RETURNING */
119  /* set-processed flag (as an integer Value node) */
121 };
122 
123 /*
124  * Execution state of a foreign scan using postgres_fdw.
125  */
126 typedef struct PgFdwScanState
127 {
128  Relation rel; /* relcache entry for the foreign table. NULL
129  * for a foreign join scan. */
130  TupleDesc tupdesc; /* tuple descriptor of scan */
131  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
132 
133  /* extracted fdw_private data */
134  char *query; /* text of SELECT command */
135  List *retrieved_attrs; /* list of retrieved attribute numbers */
136 
137  /* for remote query execution */
138  PGconn *conn; /* connection for the scan */
139  unsigned int cursor_number; /* quasi-unique ID for my cursor */
140  bool cursor_exists; /* have we created the cursor? */
141  int numParams; /* number of parameters passed to query */
142  FmgrInfo *param_flinfo; /* output conversion functions for them */
143  List *param_exprs; /* executable expressions for param values */
144  const char **param_values; /* textual values of query parameters */
145 
146  /* for storing result tuples */
147  HeapTuple *tuples; /* array of currently-retrieved tuples */
148  int num_tuples; /* # of tuples in array */
149  int next_tuple; /* index of next one to return */
150 
151  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
152  int fetch_ct_2; /* Min(# of fetches done, 2) */
153  bool eof_reached; /* true if last fetch reached EOF */
154 
155  /* working memory contexts */
156  MemoryContext batch_cxt; /* context holding current batch of tuples */
157  MemoryContext temp_cxt; /* context for per-tuple temporary data */
158 
159  int fetch_size; /* number of tuples per fetch */
161 
162 /*
163  * Execution state of a foreign insert/update/delete operation.
164  */
165 typedef struct PgFdwModifyState
166 {
167  Relation rel; /* relcache entry for the foreign table */
168  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
169 
170  /* for remote query execution */
171  PGconn *conn; /* connection for the scan */
172  char *p_name; /* name of prepared statement, if created */
173 
174  /* extracted fdw_private data */
175  char *query; /* text of INSERT/UPDATE/DELETE command */
176  List *target_attrs; /* list of target attribute numbers */
177  bool has_returning; /* is there a RETURNING clause? */
178  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
179 
180  /* info about parameters for prepared statement */
181  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
182  int p_nums; /* number of parameters to transmit */
183  FmgrInfo *p_flinfo; /* output conversion functions for them */
184 
185  /* working memory context */
186  MemoryContext temp_cxt; /* context for per-tuple temporary data */
188 
189 /*
190  * Execution state of a foreign scan that modifies a foreign table directly.
191  */
193 {
194  Relation rel; /* relcache entry for the foreign table */
195  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
196 
197  /* extracted fdw_private data */
198  char *query; /* text of UPDATE/DELETE command */
199  bool has_returning; /* is there a RETURNING clause? */
200  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
201  bool set_processed; /* do we set the command es_processed? */
202 
203  /* for remote query execution */
204  PGconn *conn; /* connection for the update */
205  int numParams; /* number of parameters passed to query */
206  FmgrInfo *param_flinfo; /* output conversion functions for them */
207  List *param_exprs; /* executable expressions for param values */
208  const char **param_values; /* textual values of query parameters */
209 
210  /* for storing result tuples */
211  PGresult *result; /* result for query */
212  int num_tuples; /* # of result tuples */
213  int next_tuple; /* index of next one to return */
214 
215  /* working memory context */
216  MemoryContext temp_cxt; /* context for per-tuple temporary data */
218 
219 /*
220  * Workspace for analyzing a foreign table.
221  */
222 typedef struct PgFdwAnalyzeState
223 {
224  Relation rel; /* relcache entry for the foreign table */
225  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
226  List *retrieved_attrs; /* attr numbers retrieved by query */
227 
228  /* collected sample rows */
229  HeapTuple *rows; /* array of size targrows */
230  int targrows; /* target # of sample rows */
231  int numrows; /* # of sample rows collected */
232 
233  /* for random sampling */
234  double samplerows; /* # of rows fetched */
235  double rowstoskip; /* # of rows to skip before next sample */
236  ReservoirStateData rstate; /* state for reservoir sampling */
237 
238  /* working memory contexts */
239  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
240  MemoryContext temp_cxt; /* context for per-tuple temporary data */
242 
243 /*
244  * Identify the attribute where data conversion fails.
245  */
246 typedef struct ConversionLocation
247 {
248  Relation rel; /* foreign table's relcache entry. */
249  AttrNumber cur_attno; /* attribute number being processed, or 0 */
250 
251  /*
252  * In case of foreign join push down, fdw_scan_tlist is used to identify
253  * the Var node corresponding to the error location and
254  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
255  * to get the relation name and attribute name.
256  */
259 
260 /* Callback argument for ec_member_matches_foreign */
261 typedef struct
262 {
263  Expr *current; /* current expr, or NULL if not yet found */
264  List *already_used; /* expressions already dealt with */
266 
267 /*
268  * SQL functions
269  */
271 
272 /*
273  * FDW callback routines
274  */
275 static void postgresGetForeignRelSize(PlannerInfo *root,
276  RelOptInfo *baserel,
277  Oid foreigntableid);
278 static void postgresGetForeignPaths(PlannerInfo *root,
279  RelOptInfo *baserel,
280  Oid foreigntableid);
282  RelOptInfo *baserel,
283  Oid foreigntableid,
284  ForeignPath *best_path,
285  List *tlist,
286  List *scan_clauses,
287  Plan *outer_plan);
288 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
291 static void postgresEndForeignScan(ForeignScanState *node);
292 static void postgresAddForeignUpdateTargets(Query *parsetree,
293  RangeTblEntry *target_rte,
294  Relation target_relation);
296  ModifyTable *plan,
297  Index resultRelation,
298  int subplan_index);
299 static void postgresBeginForeignModify(ModifyTableState *mtstate,
300  ResultRelInfo *resultRelInfo,
301  List *fdw_private,
302  int subplan_index,
303  int eflags);
305  ResultRelInfo *resultRelInfo,
306  TupleTableSlot *slot,
307  TupleTableSlot *planSlot);
309  ResultRelInfo *resultRelInfo,
310  TupleTableSlot *slot,
311  TupleTableSlot *planSlot);
313  ResultRelInfo *resultRelInfo,
314  TupleTableSlot *slot,
315  TupleTableSlot *planSlot);
316 static void postgresEndForeignModify(EState *estate,
317  ResultRelInfo *resultRelInfo);
319 static bool postgresPlanDirectModify(PlannerInfo *root,
320  ModifyTable *plan,
321  Index resultRelation,
322  int subplan_index);
323 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
325 static void postgresEndDirectModify(ForeignScanState *node);
327  ExplainState *es);
329  ResultRelInfo *rinfo,
330  List *fdw_private,
331  int subplan_index,
332  ExplainState *es);
334  ExplainState *es);
335 static bool postgresAnalyzeForeignTable(Relation relation,
336  AcquireSampleRowsFunc *func,
337  BlockNumber *totalpages);
339  Oid serverOid);
340 static void postgresGetForeignJoinPaths(PlannerInfo *root,
341  RelOptInfo *joinrel,
342  RelOptInfo *outerrel,
343  RelOptInfo *innerrel,
344  JoinType jointype,
345  JoinPathExtraData *extra);
347  TupleTableSlot *slot);
348 static void postgresGetForeignUpperPaths(PlannerInfo *root,
349  UpperRelationKind stage,
350  RelOptInfo *input_rel,
351  RelOptInfo *output_rel);
352 
353 /*
354  * Helper functions
355  */
356 static void estimate_path_cost_size(PlannerInfo *root,
357  RelOptInfo *baserel,
358  List *join_conds,
359  List *pathkeys,
360  double *p_rows, int *p_width,
361  Cost *p_startup_cost, Cost *p_total_cost);
362 static void get_remote_estimate(const char *sql,
363  PGconn *conn,
364  double *rows,
365  int *width,
366  Cost *startup_cost,
367  Cost *total_cost);
368 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
370  void *arg);
371 static void create_cursor(ForeignScanState *node);
372 static void fetch_more_data(ForeignScanState *node);
373 static void close_cursor(PGconn *conn, unsigned int cursor_number);
374 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
375 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
376  ItemPointer tupleid,
377  TupleTableSlot *slot);
378 static void store_returning_result(PgFdwModifyState *fmstate,
379  TupleTableSlot *slot, PGresult *res);
380 static void execute_dml_stmt(ForeignScanState *node);
382 static void prepare_query_params(PlanState *node,
383  List *fdw_exprs,
384  int numParams,
385  FmgrInfo **param_flinfo,
386  List **param_exprs,
387  const char ***param_values);
388 static void process_query_params(ExprContext *econtext,
389  FmgrInfo *param_flinfo,
390  List *param_exprs,
391  const char **param_values);
392 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
393  HeapTuple *rows, int targrows,
394  double *totalrows,
395  double *totaldeadrows);
396 static void analyze_row_processor(PGresult *res, int row,
397  PgFdwAnalyzeState *astate);
399  int row,
400  Relation rel,
401  AttInMetadata *attinmeta,
402  List *retrieved_attrs,
403  ForeignScanState *fsstate,
404  MemoryContext temp_context);
405 static void conversion_error_callback(void *arg);
406 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
407  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
408  JoinPathExtraData *extra);
409 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
411  RelOptInfo *rel);
414  Path *epq_path);
415 static void add_foreign_grouping_paths(PlannerInfo *root,
416  RelOptInfo *input_rel,
417  RelOptInfo *grouped_rel);
418 
419 
420 /*
421  * Foreign-data wrapper handler function: return a struct with pointers
422  * to my callback routines.
423  */
424 Datum
426 {
427  FdwRoutine *routine = makeNode(FdwRoutine);
428 
429  /* Functions for scanning foreign tables */
437 
438  /* Functions for updating foreign tables */
451 
452  /* Function for EvalPlanQual rechecks */
454  /* Support functions for EXPLAIN */
458 
459  /* Support functions for ANALYZE */
461 
462  /* Support functions for IMPORT FOREIGN SCHEMA */
464 
465  /* Support functions for join push-down */
467 
468  /* Support functions for upper relation push-down */
470 
471  PG_RETURN_POINTER(routine);
472 }
473 
474 /*
475  * postgresGetForeignRelSize
476  * Estimate # of rows and width of the result of the scan
477  *
478  * We should consider the effect of all baserestrictinfo clauses here, but
479  * not any join clauses.
480  */
481 static void
483  RelOptInfo *baserel,
484  Oid foreigntableid)
485 {
486  PgFdwRelationInfo *fpinfo;
487  ListCell *lc;
488  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
489  const char *namespace;
490  const char *relname;
491  const char *refname;
492 
493  /*
494  * We use PgFdwRelationInfo to pass various information to subsequent
495  * functions.
496  */
497  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
498  baserel->fdw_private = (void *) fpinfo;
499 
500  /* Base foreign tables need to be pushed down always. */
501  fpinfo->pushdown_safe = true;
502 
503  /* Look up foreign-table catalog info. */
504  fpinfo->table = GetForeignTable(foreigntableid);
505  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
506 
507  /*
508  * Extract user-settable option values. Note that per-table setting of
509  * use_remote_estimate overrides per-server setting.
510  */
511  fpinfo->use_remote_estimate = false;
514  fpinfo->shippable_extensions = NIL;
515  fpinfo->fetch_size = 100;
516 
517  foreach(lc, fpinfo->server->options)
518  {
519  DefElem *def = (DefElem *) lfirst(lc);
520 
521  if (strcmp(def->defname, "use_remote_estimate") == 0)
522  fpinfo->use_remote_estimate = defGetBoolean(def);
523  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
524  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
525  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
526  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
527  else if (strcmp(def->defname, "extensions") == 0)
528  fpinfo->shippable_extensions =
529  ExtractExtensionList(defGetString(def), false);
530  else if (strcmp(def->defname, "fetch_size") == 0)
531  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
532  }
533  foreach(lc, fpinfo->table->options)
534  {
535  DefElem *def = (DefElem *) lfirst(lc);
536 
537  if (strcmp(def->defname, "use_remote_estimate") == 0)
538  fpinfo->use_remote_estimate = defGetBoolean(def);
539  else if (strcmp(def->defname, "fetch_size") == 0)
540  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
541  }
542 
543  /*
544  * If the table or the server is configured to use remote estimates,
545  * identify which user to do remote access as during planning. This
546  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
547  * permissions, the query would have failed at runtime anyway.
548  */
549  if (fpinfo->use_remote_estimate)
550  {
551  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
552 
553  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
554  }
555  else
556  fpinfo->user = NULL;
557 
558  /*
559  * Identify which baserestrictinfo clauses can be sent to the remote
560  * server and which can't.
561  */
562  classifyConditions(root, baserel, baserel->baserestrictinfo,
563  &fpinfo->remote_conds, &fpinfo->local_conds);
564 
565  /*
566  * Identify which attributes will need to be retrieved from the remote
567  * server. These include all attrs needed for joins or final output, plus
568  * all attrs used in the local_conds. (Note: if we end up using a
569  * parameterized scan, it's possible that some of the join clauses will be
570  * sent to the remote and thus we wouldn't really need to retrieve the
571  * columns used in them. Doesn't seem worth detecting that case though.)
572  */
573  fpinfo->attrs_used = NULL;
574  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
575  &fpinfo->attrs_used);
576  foreach(lc, fpinfo->local_conds)
577  {
578  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
579 
580  pull_varattnos((Node *) rinfo->clause, baserel->relid,
581  &fpinfo->attrs_used);
582  }
583 
584  /*
585  * Compute the selectivity and cost of the local_conds, so we don't have
586  * to do it over again for each path. The best we can do for these
587  * conditions is to estimate selectivity on the basis of local statistics.
588  */
590  fpinfo->local_conds,
591  baserel->relid,
592  JOIN_INNER,
593  NULL);
594 
595  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
596 
597  /*
598  * Set cached relation costs to some negative value, so that we can detect
599  * when they are set to some sensible costs during one (usually the first)
600  * of the calls to estimate_path_cost_size().
601  */
602  fpinfo->rel_startup_cost = -1;
603  fpinfo->rel_total_cost = -1;
604 
605  /*
606  * If the table or the server is configured to use remote estimates,
607  * connect to the foreign server and execute EXPLAIN to estimate the
608  * number of rows selected by the restriction clauses, as well as the
609  * average row width. Otherwise, estimate using whatever statistics we
610  * have locally, in a way similar to ordinary tables.
611  */
612  if (fpinfo->use_remote_estimate)
613  {
614  /*
615  * Get cost/size estimates with help of remote server. Save the
616  * values in fpinfo so we don't need to do it again to generate the
617  * basic foreign path.
618  */
619  estimate_path_cost_size(root, baserel, NIL, NIL,
620  &fpinfo->rows, &fpinfo->width,
621  &fpinfo->startup_cost, &fpinfo->total_cost);
622 
623  /* Report estimated baserel size to planner. */
624  baserel->rows = fpinfo->rows;
625  baserel->reltarget->width = fpinfo->width;
626  }
627  else
628  {
629  /*
630  * If the foreign table has never been ANALYZEd, it will have relpages
631  * and reltuples equal to zero, which most likely has nothing to do
632  * with reality. We can't do a whole lot about that if we're not
633  * allowed to consult the remote server, but we can use a hack similar
634  * to plancat.c's treatment of empty relations: use a minimum size
635  * estimate of 10 pages, and divide by the column-datatype-based width
636  * estimate to get the corresponding number of tuples.
637  */
638  if (baserel->pages == 0 && baserel->tuples == 0)
639  {
640  baserel->pages = 10;
641  baserel->tuples =
642  (10 * BLCKSZ) / (baserel->reltarget->width +
644  }
645 
646  /* Estimate baserel size as best we can with local statistics. */
647  set_baserel_size_estimates(root, baserel);
648 
649  /* Fill in basically-bogus cost estimates for use later. */
650  estimate_path_cost_size(root, baserel, NIL, NIL,
651  &fpinfo->rows, &fpinfo->width,
652  &fpinfo->startup_cost, &fpinfo->total_cost);
653  }
654 
655  /*
656  * Set the name of relation in fpinfo, while we are constructing it here.
657  * It will be used to build the string describing the join relation in
658  * EXPLAIN output. We can't know whether VERBOSE option is specified or
659  * not, so always schema-qualify the foreign table name.
660  */
661  fpinfo->relation_name = makeStringInfo();
662  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
663  relname = get_rel_name(foreigntableid);
664  refname = rte->eref->aliasname;
665  appendStringInfo(fpinfo->relation_name, "%s.%s",
666  quote_identifier(namespace),
667  quote_identifier(relname));
668  if (*refname && strcmp(refname, relname) != 0)
669  appendStringInfo(fpinfo->relation_name, " %s",
671 }
672 
673 /*
674  * get_useful_ecs_for_relation
675  * Determine which EquivalenceClasses might be involved in useful
676  * orderings of this relation.
677  *
678  * This function is in some respects a mirror image of the core function
679  * pathkeys_useful_for_merging: for a regular table, we know what indexes
680  * we have and want to test whether any of them are useful. For a foreign
681  * table, we don't know what indexes are present on the remote side but
682  * want to speculate about which ones we'd like to use if they existed.
683  *
684  * This function returns a list of potentially-useful equivalence classes,
685  * but it does not guarantee that an EquivalenceMember exists which contains
686  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
687  * ft1.x + t1.x = 0, this function will say that the equivalence class
688  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
689  * t1 is local (or on a different server), it will turn out that no useful
690  * ORDER BY clause can be generated. It's not our job to figure that out
691  * here; we're only interested in identifying relevant ECs.
692  */
693 static List *
695 {
696  List *useful_eclass_list = NIL;
697  ListCell *lc;
698  Relids relids;
699 
700  /*
701  * First, consider whether any active EC is potentially useful for a merge
702  * join against this relation.
703  */
704  if (rel->has_eclass_joins)
705  {
706  foreach(lc, root->eq_classes)
707  {
708  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
709 
710  if (eclass_useful_for_merging(root, cur_ec, rel))
711  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
712  }
713  }
714 
715  /*
716  * Next, consider whether there are any non-EC derivable join clauses that
717  * are merge-joinable. If the joininfo list is empty, we can exit
718  * quickly.
719  */
720  if (rel->joininfo == NIL)
721  return useful_eclass_list;
722 
723  /* If this is a child rel, we must use the topmost parent rel to search. */
725  relids = find_childrel_top_parent(root, rel)->relids;
726  else
727  relids = rel->relids;
728 
729  /* Check each join clause in turn. */
730  foreach(lc, rel->joininfo)
731  {
732  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
733 
734  /* Consider only mergejoinable clauses */
735  if (restrictinfo->mergeopfamilies == NIL)
736  continue;
737 
738  /* Make sure we've got canonical ECs. */
739  update_mergeclause_eclasses(root, restrictinfo);
740 
741  /*
742  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
743  * that left_ec and right_ec will be initialized, per comments in
744  * distribute_qual_to_rels.
745  *
746  * We want to identify which side of this merge-joinable clause
747  * contains columns from the relation produced by this RelOptInfo. We
748  * test for overlap, not containment, because there could be extra
749  * relations on either side. For example, suppose we've got something
750  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
751  * A.y = D.y. The input rel might be the joinrel between A and B, and
752  * we'll consider the join clause A.y = D.y. relids contains a
753  * relation not involved in the join class (B) and the equivalence
754  * class for the left-hand side of the clause contains a relation not
755  * involved in the input rel (C). Despite the fact that we have only
756  * overlap and not containment in either direction, A.y is potentially
757  * useful as a sort column.
758  *
759  * Note that it's even possible that relids overlaps neither side of
760  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
761  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
762  * but overlaps neither side of B. In that case, we just skip this
763  * join clause, since it doesn't suggest a useful sort order for this
764  * relation.
765  */
766  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
767  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
768  restrictinfo->right_ec);
769  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
770  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
771  restrictinfo->left_ec);
772  }
773 
774  return useful_eclass_list;
775 }
776 
777 /*
778  * get_useful_pathkeys_for_relation
779  * Determine which orderings of a relation might be useful.
780  *
781  * Getting data in sorted order can be useful either because the requested
782  * order matches the final output ordering for the overall query we're
783  * planning, or because it enables an efficient merge join. Here, we try
784  * to figure out which pathkeys to consider.
785  */
786 static List *
788 {
789  List *useful_pathkeys_list = NIL;
790  List *useful_eclass_list;
792  EquivalenceClass *query_ec = NULL;
793  ListCell *lc;
794 
795  /*
796  * Pushing the query_pathkeys to the remote server is always worth
797  * considering, because it might let us avoid a local sort.
798  */
799  if (root->query_pathkeys)
800  {
801  bool query_pathkeys_ok = true;
802 
803  foreach(lc, root->query_pathkeys)
804  {
805  PathKey *pathkey = (PathKey *) lfirst(lc);
806  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
807  Expr *em_expr;
808 
809  /*
810  * The planner and executor don't have any clever strategy for
811  * taking data sorted by a prefix of the query's pathkeys and
812  * getting it to be sorted by all of those pathkeys. We'll just
813  * end up resorting the entire data set. So, unless we can push
814  * down all of the query pathkeys, forget it.
815  *
816  * is_foreign_expr would detect volatile expressions as well, but
817  * checking ec_has_volatile here saves some cycles.
818  */
819  if (pathkey_ec->ec_has_volatile ||
820  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
821  !is_foreign_expr(root, rel, em_expr))
822  {
823  query_pathkeys_ok = false;
824  break;
825  }
826  }
827 
828  if (query_pathkeys_ok)
829  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
830  }
831 
832  /*
833  * Even if we're not using remote estimates, having the remote side do the
834  * sort generally won't be any worse than doing it locally, and it might
835  * be much better if the remote side can generate data in the right order
836  * without needing a sort at all. However, what we're going to do next is
837  * try to generate pathkeys that seem promising for possible merge joins,
838  * and that's more speculative. A wrong choice might hurt quite a bit, so
839  * bail out if we can't use remote estimates.
840  */
841  if (!fpinfo->use_remote_estimate)
842  return useful_pathkeys_list;
843 
844  /* Get the list of interesting EquivalenceClasses. */
845  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
846 
847  /* Extract unique EC for query, if any, so we don't consider it again. */
848  if (list_length(root->query_pathkeys) == 1)
849  {
850  PathKey *query_pathkey = linitial(root->query_pathkeys);
851 
852  query_ec = query_pathkey->pk_eclass;
853  }
854 
855  /*
856  * As a heuristic, the only pathkeys we consider here are those of length
857  * one. It's surely possible to consider more, but since each one we
858  * choose to consider will generate a round-trip to the remote side, we
859  * need to be a bit cautious here. It would sure be nice to have a local
860  * cache of information about remote index definitions...
861  */
862  foreach(lc, useful_eclass_list)
863  {
864  EquivalenceClass *cur_ec = lfirst(lc);
865  Expr *em_expr;
866  PathKey *pathkey;
867 
868  /* If redundant with what we did above, skip it. */
869  if (cur_ec == query_ec)
870  continue;
871 
872  /* If no pushable expression for this rel, skip it. */
873  em_expr = find_em_expr_for_rel(cur_ec, rel);
874  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
875  continue;
876 
877  /* Looks like we can generate a pathkey, so let's do it. */
878  pathkey = make_canonical_pathkey(root, cur_ec,
879  linitial_oid(cur_ec->ec_opfamilies),
881  false);
882  useful_pathkeys_list = lappend(useful_pathkeys_list,
883  list_make1(pathkey));
884  }
885 
886  return useful_pathkeys_list;
887 }
888 
889 /*
890  * postgresGetForeignPaths
891  * Create possible scan paths for a scan on the foreign table
892  */
893 static void
895  RelOptInfo *baserel,
896  Oid foreigntableid)
897 {
898  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
899  ForeignPath *path;
900  List *ppi_list;
901  ListCell *lc;
902 
903  /*
904  * Create simplest ForeignScan path node and add it to baserel. This path
905  * corresponds to SeqScan path of regular tables (though depending on what
906  * baserestrict conditions we were able to send to remote, there might
907  * actually be an indexscan happening there). We already did all the work
908  * to estimate cost and size of this path.
909  */
910  path = create_foreignscan_path(root, baserel,
911  NULL, /* default pathtarget */
912  fpinfo->rows,
913  fpinfo->startup_cost,
914  fpinfo->total_cost,
915  NIL, /* no pathkeys */
916  NULL, /* no outer rel either */
917  NULL, /* no extra plan */
918  NIL); /* no fdw_private list */
919  add_path(baserel, (Path *) path);
920 
921  /* Add paths with pathkeys */
922  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
923 
924  /*
925  * If we're not using remote estimates, stop here. We have no way to
926  * estimate whether any join clauses would be worth sending across, so
927  * don't bother building parameterized paths.
928  */
929  if (!fpinfo->use_remote_estimate)
930  return;
931 
932  /*
933  * Thumb through all join clauses for the rel to identify which outer
934  * relations could supply one or more safe-to-send-to-remote join clauses.
935  * We'll build a parameterized path for each such outer relation.
936  *
937  * It's convenient to manage this by representing each candidate outer
938  * relation by the ParamPathInfo node for it. We can then use the
939  * ppi_clauses list in the ParamPathInfo node directly as a list of the
940  * interesting join clauses for that rel. This takes care of the
941  * possibility that there are multiple safe join clauses for such a rel,
942  * and also ensures that we account for unsafe join clauses that we'll
943  * still have to enforce locally (since the parameterized-path machinery
944  * insists that we handle all movable clauses).
945  */
946  ppi_list = NIL;
947  foreach(lc, baserel->joininfo)
948  {
949  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
950  Relids required_outer;
951  ParamPathInfo *param_info;
952 
953  /* Check if clause can be moved to this rel */
954  if (!join_clause_is_movable_to(rinfo, baserel))
955  continue;
956 
957  /* See if it is safe to send to remote */
958  if (!is_foreign_expr(root, baserel, rinfo->clause))
959  continue;
960 
961  /* Calculate required outer rels for the resulting path */
962  required_outer = bms_union(rinfo->clause_relids,
963  baserel->lateral_relids);
964  /* We do not want the foreign rel itself listed in required_outer */
965  required_outer = bms_del_member(required_outer, baserel->relid);
966 
967  /*
968  * required_outer probably can't be empty here, but if it were, we
969  * couldn't make a parameterized path.
970  */
971  if (bms_is_empty(required_outer))
972  continue;
973 
974  /* Get the ParamPathInfo */
975  param_info = get_baserel_parampathinfo(root, baserel,
976  required_outer);
977  Assert(param_info != NULL);
978 
979  /*
980  * Add it to list unless we already have it. Testing pointer equality
981  * is OK since get_baserel_parampathinfo won't make duplicates.
982  */
983  ppi_list = list_append_unique_ptr(ppi_list, param_info);
984  }
985 
986  /*
987  * The above scan examined only "generic" join clauses, not those that
988  * were absorbed into EquivalenceClauses. See if we can make anything out
989  * of EquivalenceClauses.
990  */
991  if (baserel->has_eclass_joins)
992  {
993  /*
994  * We repeatedly scan the eclass list looking for column references
995  * (or expressions) belonging to the foreign rel. Each time we find
996  * one, we generate a list of equivalence joinclauses for it, and then
997  * see if any are safe to send to the remote. Repeat till there are
998  * no more candidate EC members.
999  */
1001 
1002  arg.already_used = NIL;
1003  for (;;)
1004  {
1005  List *clauses;
1006 
1007  /* Make clauses, skipping any that join to lateral_referencers */
1008  arg.current = NULL;
1010  baserel,
1012  (void *) &arg,
1013  baserel->lateral_referencers);
1014 
1015  /* Done if there are no more expressions in the foreign rel */
1016  if (arg.current == NULL)
1017  {
1018  Assert(clauses == NIL);
1019  break;
1020  }
1021 
1022  /* Scan the extracted join clauses */
1023  foreach(lc, clauses)
1024  {
1025  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1026  Relids required_outer;
1027  ParamPathInfo *param_info;
1028 
1029  /* Check if clause can be moved to this rel */
1030  if (!join_clause_is_movable_to(rinfo, baserel))
1031  continue;
1032 
1033  /* See if it is safe to send to remote */
1034  if (!is_foreign_expr(root, baserel, rinfo->clause))
1035  continue;
1036 
1037  /* Calculate required outer rels for the resulting path */
1038  required_outer = bms_union(rinfo->clause_relids,
1039  baserel->lateral_relids);
1040  required_outer = bms_del_member(required_outer, baserel->relid);
1041  if (bms_is_empty(required_outer))
1042  continue;
1043 
1044  /* Get the ParamPathInfo */
1045  param_info = get_baserel_parampathinfo(root, baserel,
1046  required_outer);
1047  Assert(param_info != NULL);
1048 
1049  /* Add it to list unless we already have it */
1050  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1051  }
1052 
1053  /* Try again, now ignoring the expression we found this time */
1054  arg.already_used = lappend(arg.already_used, arg.current);
1055  }
1056  }
1057 
1058  /*
1059  * Now build a path for each useful outer relation.
1060  */
1061  foreach(lc, ppi_list)
1062  {
1063  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1064  double rows;
1065  int width;
1066  Cost startup_cost;
1067  Cost total_cost;
1068 
1069  /* Get a cost estimate from the remote */
1070  estimate_path_cost_size(root, baserel,
1071  param_info->ppi_clauses, NIL,
1072  &rows, &width,
1073  &startup_cost, &total_cost);
1074 
1075  /*
1076  * ppi_rows currently won't get looked at by anything, but still we
1077  * may as well ensure that it matches our idea of the rowcount.
1078  */
1079  param_info->ppi_rows = rows;
1080 
1081  /* Make the path */
1082  path = create_foreignscan_path(root, baserel,
1083  NULL, /* default pathtarget */
1084  rows,
1085  startup_cost,
1086  total_cost,
1087  NIL, /* no pathkeys */
1088  param_info->ppi_req_outer,
1089  NULL,
1090  NIL); /* no fdw_private list */
1091  add_path(baserel, (Path *) path);
1092  }
1093 }
1094 
1095 /*
1096  * postgresGetForeignPlan
1097  * Create ForeignScan plan node which implements selected best path
1098  */
1099 static ForeignScan *
1101  RelOptInfo *foreignrel,
1102  Oid foreigntableid,
1103  ForeignPath *best_path,
1104  List *tlist,
1105  List *scan_clauses,
1106  Plan *outer_plan)
1107 {
1108  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1109  Index scan_relid;
1110  List *fdw_private;
1111  List *remote_conds = NIL;
1112  List *remote_exprs = NIL;
1113  List *local_exprs = NIL;
1114  List *params_list = NIL;
1115  List *retrieved_attrs;
1116  StringInfoData sql;
1117  ListCell *lc;
1118  List *fdw_scan_tlist = NIL;
1119 
1120  /*
1121  * For base relations, set scan_relid as the relid of the relation. For
1122  * other kinds of relations set it to 0.
1123  */
1124  if (foreignrel->reloptkind == RELOPT_BASEREL ||
1125  foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1126  scan_relid = foreignrel->relid;
1127  else
1128  {
1129  scan_relid = 0;
1130 
1131  /*
1132  * create_scan_plan() and create_foreignscan_plan() pass
1133  * rel->baserestrictinfo + parameterization clauses through
1134  * scan_clauses. For a join rel->baserestrictinfo is NIL and we are
1135  * not considering parameterization right now, so there should be no
1136  * scan_clauses for a joinrel and upper rel either.
1137  */
1138  Assert(!scan_clauses);
1139  }
1140 
1141  /*
1142  * Separate the scan_clauses into those that can be executed remotely and
1143  * those that can't. baserestrictinfo clauses that were previously
1144  * determined to be safe or unsafe by classifyConditions are shown in
1145  * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
1146  * scan_clauses list will be a join clause, which we have to check for
1147  * remote-safety.
1148  *
1149  * Note: the join clauses we see here should be the exact same ones
1150  * previously examined by postgresGetForeignPaths. Possibly it'd be worth
1151  * passing forward the classification work done then, rather than
1152  * repeating it here.
1153  *
1154  * This code must match "extract_actual_clauses(scan_clauses, false)"
1155  * except for the additional decision about remote versus local execution.
1156  * Note however that we don't strip the RestrictInfo nodes from the
1157  * remote_conds list, since appendWhereClause expects a list of
1158  * RestrictInfos.
1159  */
1160  foreach(lc, scan_clauses)
1161  {
1162  RestrictInfo *rinfo = castNode(RestrictInfo, lfirst(lc));
1163 
1164  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1165  if (rinfo->pseudoconstant)
1166  continue;
1167 
1168  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1169  {
1170  remote_conds = lappend(remote_conds, rinfo);
1171  remote_exprs = lappend(remote_exprs, rinfo->clause);
1172  }
1173  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1174  local_exprs = lappend(local_exprs, rinfo->clause);
1175  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1176  {
1177  remote_conds = lappend(remote_conds, rinfo);
1178  remote_exprs = lappend(remote_exprs, rinfo->clause);
1179  }
1180  else
1181  local_exprs = lappend(local_exprs, rinfo->clause);
1182  }
1183 
1184  if (foreignrel->reloptkind == RELOPT_JOINREL ||
1185  foreignrel->reloptkind == RELOPT_UPPER_REL)
1186  {
1187  /* For a join relation, get the conditions from fdw_private structure */
1188  remote_conds = fpinfo->remote_conds;
1189  local_exprs = fpinfo->local_conds;
1190 
1191  /* Build the list of columns to be fetched from the foreign server. */
1192  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1193 
1194  /*
1195  * Ensure that the outer plan produces a tuple whose descriptor
1196  * matches our scan tuple slot. This is safe because all scans and
1197  * joins support projection, so we never need to insert a Result node.
1198  * Also, remove the local conditions from outer plan's quals, lest
1199  * they will be evaluated twice, once by the local plan and once by
1200  * the scan.
1201  */
1202  if (outer_plan)
1203  {
1204  ListCell *lc;
1205 
1206  /*
1207  * Right now, we only consider grouping and aggregation beyond
1208  * joins. Queries involving aggregates or grouping do not require
1209  * EPQ mechanism, hence should not have an outer plan here.
1210  */
1211  Assert(foreignrel->reloptkind != RELOPT_UPPER_REL);
1212 
1213  outer_plan->targetlist = fdw_scan_tlist;
1214 
1215  foreach(lc, local_exprs)
1216  {
1217  Join *join_plan = (Join *) outer_plan;
1218  Node *qual = lfirst(lc);
1219 
1220  outer_plan->qual = list_delete(outer_plan->qual, qual);
1221 
1222  /*
1223  * For an inner join the local conditions of foreign scan plan
1224  * can be part of the joinquals as well.
1225  */
1226  if (join_plan->jointype == JOIN_INNER)
1227  join_plan->joinqual = list_delete(join_plan->joinqual,
1228  qual);
1229  }
1230  }
1231  }
1232 
1233  /*
1234  * Build the query string to be sent for execution, and identify
1235  * expressions to be sent as parameters.
1236  */
1237  initStringInfo(&sql);
1238  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1239  remote_conds, best_path->path.pathkeys,
1240  &retrieved_attrs, &params_list);
1241 
1242  /*
1243  * Build the fdw_private list that will be available to the executor.
1244  * Items in the list must match order in enum FdwScanPrivateIndex.
1245  */
1246  fdw_private = list_make4(makeString(sql.data),
1247  remote_conds,
1248  retrieved_attrs,
1249  makeInteger(fpinfo->fetch_size));
1250  if (foreignrel->reloptkind == RELOPT_JOINREL ||
1251  foreignrel->reloptkind == RELOPT_UPPER_REL)
1252  fdw_private = lappend(fdw_private,
1253  makeString(fpinfo->relation_name->data));
1254 
1255  /*
1256  * Create the ForeignScan node for the given relation.
1257  *
1258  * Note that the remote parameter expressions are stored in the fdw_exprs
1259  * field of the finished plan node; we can't keep them in private state
1260  * because then they wouldn't be subject to later planner processing.
1261  */
1262  return make_foreignscan(tlist,
1263  local_exprs,
1264  scan_relid,
1265  params_list,
1266  fdw_private,
1267  fdw_scan_tlist,
1268  remote_exprs,
1269  outer_plan);
1270 }
1271 
1272 /*
1273  * postgresBeginForeignScan
1274  * Initiate an executor scan of a foreign PostgreSQL table.
1275  */
1276 static void
1278 {
1279  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1280  EState *estate = node->ss.ps.state;
1281  PgFdwScanState *fsstate;
1282  RangeTblEntry *rte;
1283  Oid userid;
1284  ForeignTable *table;
1285  UserMapping *user;
1286  int rtindex;
1287  int numParams;
1288 
1289  /*
1290  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1291  */
1292  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1293  return;
1294 
1295  /*
1296  * We'll save private state in node->fdw_state.
1297  */
1298  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1299  node->fdw_state = (void *) fsstate;
1300 
1301  /*
1302  * Identify which user to do the remote access as. This should match what
1303  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1304  * lowest-numbered member RTE as a representative; we would get the same
1305  * result from any.
1306  */
1307  if (fsplan->scan.scanrelid > 0)
1308  rtindex = fsplan->scan.scanrelid;
1309  else
1310  rtindex = bms_next_member(fsplan->fs_relids, -1);
1311  rte = rt_fetch(rtindex, estate->es_range_table);
1312  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1313 
1314  /* Get info about foreign table. */
1315  table = GetForeignTable(rte->relid);
1316  user = GetUserMapping(userid, table->serverid);
1317 
1318  /*
1319  * Get connection to the foreign server. Connection manager will
1320  * establish new connection if necessary.
1321  */
1322  fsstate->conn = GetConnection(user, false);
1323 
1324  /* Assign a unique ID for my cursor */
1325  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1326  fsstate->cursor_exists = false;
1327 
1328  /* Get private info created by planner functions. */
1329  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1331  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1333  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1335 
1336  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1337  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1338  "postgres_fdw tuple data",
1340  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1341  "postgres_fdw temporary data",
1343 
1344  /*
1345  * Get info we'll need for converting data fetched from the foreign server
1346  * into local representation and error reporting during that process.
1347  */
1348  if (fsplan->scan.scanrelid > 0)
1349  {
1350  fsstate->rel = node->ss.ss_currentRelation;
1351  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1352  }
1353  else
1354  {
1355  fsstate->rel = NULL;
1356  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1357  }
1358 
1359  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1360 
1361  /*
1362  * Prepare for processing of parameters used in remote query, if any.
1363  */
1364  numParams = list_length(fsplan->fdw_exprs);
1365  fsstate->numParams = numParams;
1366  if (numParams > 0)
1368  fsplan->fdw_exprs,
1369  numParams,
1370  &fsstate->param_flinfo,
1371  &fsstate->param_exprs,
1372  &fsstate->param_values);
1373 }
1374 
1375 /*
1376  * postgresIterateForeignScan
1377  * Retrieve next row from the result set, or clear tuple slot to indicate
1378  * EOF.
1379  */
1380 static TupleTableSlot *
1382 {
1383  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1384  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1385 
1386  /*
1387  * If this is the first call after Begin or ReScan, we need to create the
1388  * cursor on the remote side.
1389  */
1390  if (!fsstate->cursor_exists)
1391  create_cursor(node);
1392 
1393  /*
1394  * Get some more tuples, if we've run out.
1395  */
1396  if (fsstate->next_tuple >= fsstate->num_tuples)
1397  {
1398  /* No point in another fetch if we already detected EOF, though. */
1399  if (!fsstate->eof_reached)
1400  fetch_more_data(node);
1401  /* If we didn't get any tuples, must be end of data. */
1402  if (fsstate->next_tuple >= fsstate->num_tuples)
1403  return ExecClearTuple(slot);
1404  }
1405 
1406  /*
1407  * Return the next tuple.
1408  */
1409  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1410  slot,
1411  InvalidBuffer,
1412  false);
1413 
1414  return slot;
1415 }
1416 
1417 /*
1418  * postgresReScanForeignScan
1419  * Restart the scan.
1420  */
1421 static void
1423 {
1424  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1425  char sql[64];
1426  PGresult *res;
1427 
1428  /* If we haven't created the cursor yet, nothing to do. */
1429  if (!fsstate->cursor_exists)
1430  return;
1431 
1432  /*
1433  * If any internal parameters affecting this node have changed, we'd
1434  * better destroy and recreate the cursor. Otherwise, rewinding it should
1435  * be good enough. If we've only fetched zero or one batch, we needn't
1436  * even rewind the cursor, just rescan what we have.
1437  */
1438  if (node->ss.ps.chgParam != NULL)
1439  {
1440  fsstate->cursor_exists = false;
1441  snprintf(sql, sizeof(sql), "CLOSE c%u",
1442  fsstate->cursor_number);
1443  }
1444  else if (fsstate->fetch_ct_2 > 1)
1445  {
1446  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1447  fsstate->cursor_number);
1448  }
1449  else
1450  {
1451  /* Easy: just rescan what we already have in memory, if anything */
1452  fsstate->next_tuple = 0;
1453  return;
1454  }
1455 
1456  /*
1457  * We don't use a PG_TRY block here, so be careful not to throw error
1458  * without releasing the PGresult.
1459  */
1460  res = pgfdw_exec_query(fsstate->conn, sql);
1461  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1462  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1463  PQclear(res);
1464 
1465  /* Now force a fresh FETCH. */
1466  fsstate->tuples = NULL;
1467  fsstate->num_tuples = 0;
1468  fsstate->next_tuple = 0;
1469  fsstate->fetch_ct_2 = 0;
1470  fsstate->eof_reached = false;
1471 }
1472 
1473 /*
1474  * postgresEndForeignScan
1475  * Finish scanning foreign table and dispose objects used for this scan
1476  */
1477 static void
1479 {
1480  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1481 
1482  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1483  if (fsstate == NULL)
1484  return;
1485 
1486  /* Close the cursor if open, to prevent accumulation of cursors */
1487  if (fsstate->cursor_exists)
1488  close_cursor(fsstate->conn, fsstate->cursor_number);
1489 
1490  /* Release remote connection */
1491  ReleaseConnection(fsstate->conn);
1492  fsstate->conn = NULL;
1493 
1494  /* MemoryContexts will be deleted automatically. */
1495 }
1496 
1497 /*
1498  * postgresAddForeignUpdateTargets
1499  * Add resjunk column(s) needed for update/delete on a foreign table
1500  */
1501 static void
1503  RangeTblEntry *target_rte,
1504  Relation target_relation)
1505 {
1506  Var *var;
1507  const char *attrname;
1508  TargetEntry *tle;
1509 
1510  /*
1511  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1512  */
1513 
1514  /* Make a Var representing the desired value */
1515  var = makeVar(parsetree->resultRelation,
1517  TIDOID,
1518  -1,
1519  InvalidOid,
1520  0);
1521 
1522  /* Wrap it in a resjunk TLE with the right name ... */
1523  attrname = "ctid";
1524 
1525  tle = makeTargetEntry((Expr *) var,
1526  list_length(parsetree->targetList) + 1,
1527  pstrdup(attrname),
1528  true);
1529 
1530  /* ... and add it to the query's targetlist */
1531  parsetree->targetList = lappend(parsetree->targetList, tle);
1532 }
1533 
1534 /*
1535  * postgresPlanForeignModify
1536  * Plan an insert/update/delete operation on a foreign table
1537  */
1538 static List *
1540  ModifyTable *plan,
1541  Index resultRelation,
1542  int subplan_index)
1543 {
1544  CmdType operation = plan->operation;
1545  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1546  Relation rel;
1547  StringInfoData sql;
1548  List *targetAttrs = NIL;
1549  List *returningList = NIL;
1550  List *retrieved_attrs = NIL;
1551  bool doNothing = false;
1552 
1553  initStringInfo(&sql);
1554 
1555  /*
1556  * Core code already has some lock on each rel being planned, so we can
1557  * use NoLock here.
1558  */
1559  rel = heap_open(rte->relid, NoLock);
1560 
1561  /*
1562  * In an INSERT, we transmit all columns that are defined in the foreign
1563  * table. In an UPDATE, we transmit only columns that were explicitly
1564  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1565  * (We can't do that for INSERT since we would miss sending default values
1566  * for columns not listed in the source statement.)
1567  */
1568  if (operation == CMD_INSERT)
1569  {
1570  TupleDesc tupdesc = RelationGetDescr(rel);
1571  int attnum;
1572 
1573  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1574  {
1575  Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1576 
1577  if (!attr->attisdropped)
1578  targetAttrs = lappend_int(targetAttrs, attnum);
1579  }
1580  }
1581  else if (operation == CMD_UPDATE)
1582  {
1583  int col;
1584 
1585  col = -1;
1586  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1587  {
1588  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1590 
1591  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1592  elog(ERROR, "system-column update is not supported");
1593  targetAttrs = lappend_int(targetAttrs, attno);
1594  }
1595  }
1596 
1597  /*
1598  * Extract the relevant RETURNING list if any.
1599  */
1600  if (plan->returningLists)
1601  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1602 
1603  /*
1604  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1605  * should have already been rejected in the optimizer, as presently there
1606  * is no way to recognize an arbiter index on a foreign table. Only DO
1607  * NOTHING is supported without an inference specification.
1608  */
1609  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1610  doNothing = true;
1611  else if (plan->onConflictAction != ONCONFLICT_NONE)
1612  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1613  (int) plan->onConflictAction);
1614 
1615  /*
1616  * Construct the SQL command string.
1617  */
1618  switch (operation)
1619  {
1620  case CMD_INSERT:
1621  deparseInsertSql(&sql, root, resultRelation, rel,
1622  targetAttrs, doNothing, returningList,
1623  &retrieved_attrs);
1624  break;
1625  case CMD_UPDATE:
1626  deparseUpdateSql(&sql, root, resultRelation, rel,
1627  targetAttrs, returningList,
1628  &retrieved_attrs);
1629  break;
1630  case CMD_DELETE:
1631  deparseDeleteSql(&sql, root, resultRelation, rel,
1632  returningList,
1633  &retrieved_attrs);
1634  break;
1635  default:
1636  elog(ERROR, "unexpected operation: %d", (int) operation);
1637  break;
1638  }
1639 
1640  heap_close(rel, NoLock);
1641 
1642  /*
1643  * Build the fdw_private list that will be available to the executor.
1644  * Items in the list must match enum FdwModifyPrivateIndex, above.
1645  */
1646  return list_make4(makeString(sql.data),
1647  targetAttrs,
1648  makeInteger((retrieved_attrs != NIL)),
1649  retrieved_attrs);
1650 }
1651 
1652 /*
1653  * postgresBeginForeignModify
1654  * Begin an insert/update/delete operation on a foreign table
1655  */
1656 static void
1658  ResultRelInfo *resultRelInfo,
1659  List *fdw_private,
1660  int subplan_index,
1661  int eflags)
1662 {
1663  PgFdwModifyState *fmstate;
1664  EState *estate = mtstate->ps.state;
1665  CmdType operation = mtstate->operation;
1666  Relation rel = resultRelInfo->ri_RelationDesc;
1667  RangeTblEntry *rte;
1668  Oid userid;
1669  ForeignTable *table;
1670  UserMapping *user;
1671  AttrNumber n_params;
1672  Oid typefnoid;
1673  bool isvarlena;
1674  ListCell *lc;
1675 
1676  /*
1677  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1678  * stays NULL.
1679  */
1680  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1681  return;
1682 
1683  /* Begin constructing PgFdwModifyState. */
1684  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1685  fmstate->rel = rel;
1686 
1687  /*
1688  * Identify which user to do the remote access as. This should match what
1689  * ExecCheckRTEPerms() does.
1690  */
1691  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1692  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1693 
1694  /* Get info about foreign table. */
1695  table = GetForeignTable(RelationGetRelid(rel));
1696  user = GetUserMapping(userid, table->serverid);
1697 
1698  /* Open connection; report that we'll create a prepared statement. */
1699  fmstate->conn = GetConnection(user, true);
1700  fmstate->p_name = NULL; /* prepared statement not made yet */
1701 
1702  /* Deconstruct fdw_private data. */
1703  fmstate->query = strVal(list_nth(fdw_private,
1705  fmstate->target_attrs = (List *) list_nth(fdw_private,
1707  fmstate->has_returning = intVal(list_nth(fdw_private,
1709  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1711 
1712  /* Create context for per-tuple temp workspace. */
1713  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1714  "postgres_fdw temporary data",
1716 
1717  /* Prepare for input conversion of RETURNING results. */
1718  if (fmstate->has_returning)
1720 
1721  /* Prepare for output conversion of parameters used in prepared stmt. */
1722  n_params = list_length(fmstate->target_attrs) + 1;
1723  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1724  fmstate->p_nums = 0;
1725 
1726  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1727  {
1728  /* Find the ctid resjunk column in the subplan's result */
1729  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1730 
1732  "ctid");
1733  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1734  elog(ERROR, "could not find junk ctid column");
1735 
1736  /* First transmittable parameter will be ctid */
1737  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1738  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1739  fmstate->p_nums++;
1740  }
1741 
1742  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1743  {
1744  /* Set up for remaining transmittable parameters */
1745  foreach(lc, fmstate->target_attrs)
1746  {
1747  int attnum = lfirst_int(lc);
1748  Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1749 
1750  Assert(!attr->attisdropped);
1751 
1752  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1753  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1754  fmstate->p_nums++;
1755  }
1756  }
1757 
1758  Assert(fmstate->p_nums <= n_params);
1759 
1760  resultRelInfo->ri_FdwState = fmstate;
1761 }
1762 
1763 /*
1764  * postgresExecForeignInsert
1765  * Insert one row into a foreign table
1766  */
1767 static TupleTableSlot *
1769  ResultRelInfo *resultRelInfo,
1770  TupleTableSlot *slot,
1771  TupleTableSlot *planSlot)
1772 {
1773  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1774  const char **p_values;
1775  PGresult *res;
1776  int n_rows;
1777 
1778  /* Set up the prepared statement on the remote server, if we didn't yet */
1779  if (!fmstate->p_name)
1780  prepare_foreign_modify(fmstate);
1781 
1782  /* Convert parameters needed by prepared statement to text form */
1783  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1784 
1785  /*
1786  * Execute the prepared statement.
1787  */
1788  if (!PQsendQueryPrepared(fmstate->conn,
1789  fmstate->p_name,
1790  fmstate->p_nums,
1791  p_values,
1792  NULL,
1793  NULL,
1794  0))
1795  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1796 
1797  /*
1798  * Get the result, and check for success.
1799  *
1800  * We don't use a PG_TRY block here, so be careful not to throw error
1801  * without releasing the PGresult.
1802  */
1803  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1804  if (PQresultStatus(res) !=
1806  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1807 
1808  /* Check number of rows affected, and fetch RETURNING tuple if any */
1809  if (fmstate->has_returning)
1810  {
1811  n_rows = PQntuples(res);
1812  if (n_rows > 0)
1813  store_returning_result(fmstate, slot, res);
1814  }
1815  else
1816  n_rows = atoi(PQcmdTuples(res));
1817 
1818  /* And clean up */
1819  PQclear(res);
1820 
1821  MemoryContextReset(fmstate->temp_cxt);
1822 
1823  /* Return NULL if nothing was inserted on the remote end */
1824  return (n_rows > 0) ? slot : NULL;
1825 }
1826 
1827 /*
1828  * postgresExecForeignUpdate
1829  * Update one row in a foreign table
1830  */
1831 static TupleTableSlot *
1833  ResultRelInfo *resultRelInfo,
1834  TupleTableSlot *slot,
1835  TupleTableSlot *planSlot)
1836 {
1837  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1838  Datum datum;
1839  bool isNull;
1840  const char **p_values;
1841  PGresult *res;
1842  int n_rows;
1843 
1844  /* Set up the prepared statement on the remote server, if we didn't yet */
1845  if (!fmstate->p_name)
1846  prepare_foreign_modify(fmstate);
1847 
1848  /* Get the ctid that was passed up as a resjunk column */
1849  datum = ExecGetJunkAttribute(planSlot,
1850  fmstate->ctidAttno,
1851  &isNull);
1852  /* shouldn't ever get a null result... */
1853  if (isNull)
1854  elog(ERROR, "ctid is NULL");
1855 
1856  /* Convert parameters needed by prepared statement to text form */
1857  p_values = convert_prep_stmt_params(fmstate,
1858  (ItemPointer) DatumGetPointer(datum),
1859  slot);
1860 
1861  /*
1862  * Execute the prepared statement.
1863  */
1864  if (!PQsendQueryPrepared(fmstate->conn,
1865  fmstate->p_name,
1866  fmstate->p_nums,
1867  p_values,
1868  NULL,
1869  NULL,
1870  0))
1871  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1872 
1873  /*
1874  * Get the result, and check for success.
1875  *
1876  * We don't use a PG_TRY block here, so be careful not to throw error
1877  * without releasing the PGresult.
1878  */
1879  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1880  if (PQresultStatus(res) !=
1882  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1883 
1884  /* Check number of rows affected, and fetch RETURNING tuple if any */
1885  if (fmstate->has_returning)
1886  {
1887  n_rows = PQntuples(res);
1888  if (n_rows > 0)
1889  store_returning_result(fmstate, slot, res);
1890  }
1891  else
1892  n_rows = atoi(PQcmdTuples(res));
1893 
1894  /* And clean up */
1895  PQclear(res);
1896 
1897  MemoryContextReset(fmstate->temp_cxt);
1898 
1899  /* Return NULL if nothing was updated on the remote end */
1900  return (n_rows > 0) ? slot : NULL;
1901 }
1902 
1903 /*
1904  * postgresExecForeignDelete
1905  * Delete one row from a foreign table
1906  */
1907 static TupleTableSlot *
1909  ResultRelInfo *resultRelInfo,
1910  TupleTableSlot *slot,
1911  TupleTableSlot *planSlot)
1912 {
1913  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1914  Datum datum;
1915  bool isNull;
1916  const char **p_values;
1917  PGresult *res;
1918  int n_rows;
1919 
1920  /* Set up the prepared statement on the remote server, if we didn't yet */
1921  if (!fmstate->p_name)
1922  prepare_foreign_modify(fmstate);
1923 
1924  /* Get the ctid that was passed up as a resjunk column */
1925  datum = ExecGetJunkAttribute(planSlot,
1926  fmstate->ctidAttno,
1927  &isNull);
1928  /* shouldn't ever get a null result... */
1929  if (isNull)
1930  elog(ERROR, "ctid is NULL");
1931 
1932  /* Convert parameters needed by prepared statement to text form */
1933  p_values = convert_prep_stmt_params(fmstate,
1934  (ItemPointer) DatumGetPointer(datum),
1935  NULL);
1936 
1937  /*
1938  * Execute the prepared statement.
1939  */
1940  if (!PQsendQueryPrepared(fmstate->conn,
1941  fmstate->p_name,
1942  fmstate->p_nums,
1943  p_values,
1944  NULL,
1945  NULL,
1946  0))
1947  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1948 
1949  /*
1950  * Get the result, and check for success.
1951  *
1952  * We don't use a PG_TRY block here, so be careful not to throw error
1953  * without releasing the PGresult.
1954  */
1955  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1956  if (PQresultStatus(res) !=
1958  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1959 
1960  /* Check number of rows affected, and fetch RETURNING tuple if any */
1961  if (fmstate->has_returning)
1962  {
1963  n_rows = PQntuples(res);
1964  if (n_rows > 0)
1965  store_returning_result(fmstate, slot, res);
1966  }
1967  else
1968  n_rows = atoi(PQcmdTuples(res));
1969 
1970  /* And clean up */
1971  PQclear(res);
1972 
1973  MemoryContextReset(fmstate->temp_cxt);
1974 
1975  /* Return NULL if nothing was deleted on the remote end */
1976  return (n_rows > 0) ? slot : NULL;
1977 }
1978 
1979 /*
1980  * postgresEndForeignModify
1981  * Finish an insert/update/delete operation on a foreign table
1982  */
1983 static void
1985  ResultRelInfo *resultRelInfo)
1986 {
1987  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1988 
1989  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1990  if (fmstate == NULL)
1991  return;
1992 
1993  /* If we created a prepared statement, destroy it */
1994  if (fmstate->p_name)
1995  {
1996  char sql[64];
1997  PGresult *res;
1998 
1999  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2000 
2001  /*
2002  * We don't use a PG_TRY block here, so be careful not to throw error
2003  * without releasing the PGresult.
2004  */
2005  res = pgfdw_exec_query(fmstate->conn, sql);
2006  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2007  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2008  PQclear(res);
2009  fmstate->p_name = NULL;
2010  }
2011 
2012  /* Release remote connection */
2013  ReleaseConnection(fmstate->conn);
2014  fmstate->conn = NULL;
2015 }
2016 
2017 /*
2018  * postgresIsForeignRelUpdatable
2019  * Determine whether a foreign table supports INSERT, UPDATE and/or
2020  * DELETE.
2021  */
2022 static int
2024 {
2025  bool updatable;
2026  ForeignTable *table;
2027  ForeignServer *server;
2028  ListCell *lc;
2029 
2030  /*
2031  * By default, all postgres_fdw foreign tables are assumed updatable. This
2032  * can be overridden by a per-server setting, which in turn can be
2033  * overridden by a per-table setting.
2034  */
2035  updatable = true;
2036 
2037  table = GetForeignTable(RelationGetRelid(rel));
2038  server = GetForeignServer(table->serverid);
2039 
2040  foreach(lc, server->options)
2041  {
2042  DefElem *def = (DefElem *) lfirst(lc);
2043 
2044  if (strcmp(def->defname, "updatable") == 0)
2045  updatable = defGetBoolean(def);
2046  }
2047  foreach(lc, table->options)
2048  {
2049  DefElem *def = (DefElem *) lfirst(lc);
2050 
2051  if (strcmp(def->defname, "updatable") == 0)
2052  updatable = defGetBoolean(def);
2053  }
2054 
2055  /*
2056  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2057  */
2058  return updatable ?
2059  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2060 }
2061 
2062 /*
2063  * postgresRecheckForeignScan
2064  * Execute a local join execution plan for a foreign join
2065  */
2066 static bool
2068 {
2069  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2071  TupleTableSlot *result;
2072 
2073  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2074  if (scanrelid > 0)
2075  return true;
2076 
2077  Assert(outerPlan != NULL);
2078 
2079  /* Execute a local join execution plan */
2080  result = ExecProcNode(outerPlan);
2081  if (TupIsNull(result))
2082  return false;
2083 
2084  /* Store result in the given slot */
2085  ExecCopySlot(slot, result);
2086 
2087  return true;
2088 }
2089 
2090 /*
2091  * postgresPlanDirectModify
2092  * Consider a direct foreign table modification
2093  *
2094  * Decide whether it is safe to modify a foreign table directly, and if so,
2095  * rewrite subplan accordingly.
2096  */
2097 static bool
2099  ModifyTable *plan,
2100  Index resultRelation,
2101  int subplan_index)
2102 {
2103  CmdType operation = plan->operation;
2104  Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
2105  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
2106  Relation rel;
2107  StringInfoData sql;
2108  ForeignScan *fscan;
2109  List *targetAttrs = NIL;
2110  List *remote_conds;
2111  List *params_list = NIL;
2112  List *returningList = NIL;
2113  List *retrieved_attrs = NIL;
2114 
2115  /*
2116  * Decide whether it is safe to modify a foreign table directly.
2117  */
2118 
2119  /*
2120  * The table modification must be an UPDATE or DELETE.
2121  */
2122  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2123  return false;
2124 
2125  /*
2126  * It's unsafe to modify a foreign table directly if there are any local
2127  * joins needed.
2128  */
2129  if (!IsA(subplan, ForeignScan))
2130  return false;
2131 
2132  /*
2133  * It's unsafe to modify a foreign table directly if there are any quals
2134  * that should be evaluated locally.
2135  */
2136  if (subplan->qual != NIL)
2137  return false;
2138 
2139  /*
2140  * We can't handle an UPDATE or DELETE on a foreign join for now.
2141  */
2142  fscan = (ForeignScan *) subplan;
2143  if (fscan->scan.scanrelid == 0)
2144  return false;
2145 
2146  /*
2147  * It's unsafe to update a foreign table directly, if any expressions to
2148  * assign to the target columns are unsafe to evaluate remotely.
2149  */
2150  if (operation == CMD_UPDATE)
2151  {
2152  RelOptInfo *baserel = root->simple_rel_array[resultRelation];
2153  int col;
2154 
2155  /*
2156  * We transmit only columns that were explicitly targets of the
2157  * UPDATE, so as to avoid unnecessary data transmission.
2158  */
2159  col = -1;
2160  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2161  {
2162  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2164  TargetEntry *tle;
2165 
2166  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2167  elog(ERROR, "system-column update is not supported");
2168 
2169  tle = get_tle_by_resno(subplan->targetlist, attno);
2170 
2171  if (!tle)
2172  elog(ERROR, "attribute number %d not found in subplan targetlist",
2173  attno);
2174 
2175  if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
2176  return false;
2177 
2178  targetAttrs = lappend_int(targetAttrs, attno);
2179  }
2180  }
2181 
2182  /*
2183  * Ok, rewrite subplan so as to modify the foreign table directly.
2184  */
2185  initStringInfo(&sql);
2186 
2187  /*
2188  * Core code already has some lock on each rel being planned, so we can
2189  * use NoLock here.
2190  */
2191  rel = heap_open(rte->relid, NoLock);
2192 
2193  /*
2194  * Extract the baserestrictinfo clauses that can be evaluated remotely.
2195  */
2196  remote_conds = (List *) list_nth(fscan->fdw_private,
2198 
2199  /*
2200  * Extract the relevant RETURNING list if any.
2201  */
2202  if (plan->returningLists)
2203  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2204 
2205  /*
2206  * Construct the SQL command string.
2207  */
2208  switch (operation)
2209  {
2210  case CMD_UPDATE:
2211  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2212  ((Plan *) fscan)->targetlist,
2213  targetAttrs,
2214  remote_conds, &params_list,
2215  returningList, &retrieved_attrs);
2216  break;
2217  case CMD_DELETE:
2218  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2219  remote_conds, &params_list,
2220  returningList, &retrieved_attrs);
2221  break;
2222  default:
2223  elog(ERROR, "unexpected operation: %d", (int) operation);
2224  break;
2225  }
2226 
2227  /*
2228  * Update the operation info.
2229  */
2230  fscan->operation = operation;
2231 
2232  /*
2233  * Update the fdw_exprs list that will be available to the executor.
2234  */
2235  fscan->fdw_exprs = params_list;
2236 
2237  /*
2238  * Update the fdw_private list that will be available to the executor.
2239  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2240  */
2241  fscan->fdw_private = list_make4(makeString(sql.data),
2242  makeInteger((retrieved_attrs != NIL)),
2243  retrieved_attrs,
2244  makeInteger(plan->canSetTag));
2245 
2246  heap_close(rel, NoLock);
2247  return true;
2248 }
2249 
2250 /*
2251  * postgresBeginDirectModify
2252  * Prepare a direct foreign table modification
2253  */
2254 static void
2256 {
2257  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2258  EState *estate = node->ss.ps.state;
2259  PgFdwDirectModifyState *dmstate;
2260  RangeTblEntry *rte;
2261  Oid userid;
2262  ForeignTable *table;
2263  UserMapping *user;
2264  int numParams;
2265 
2266  /*
2267  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2268  */
2269  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2270  return;
2271 
2272  /*
2273  * We'll save private state in node->fdw_state.
2274  */
2275  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2276  node->fdw_state = (void *) dmstate;
2277 
2278  /*
2279  * Identify which user to do the remote access as. This should match what
2280  * ExecCheckRTEPerms() does.
2281  */
2282  rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2283  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2284 
2285  /* Get info about foreign table. */
2286  dmstate->rel = node->ss.ss_currentRelation;
2287  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2288  user = GetUserMapping(userid, table->serverid);
2289 
2290  /*
2291  * Get connection to the foreign server. Connection manager will
2292  * establish new connection if necessary.
2293  */
2294  dmstate->conn = GetConnection(user, false);
2295 
2296  /* Initialize state variable */
2297  dmstate->num_tuples = -1; /* -1 means not set yet */
2298 
2299  /* Get private info created by planner functions. */
2300  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2302  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2304  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2306  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2308 
2309  /* Create context for per-tuple temp workspace. */
2310  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2311  "postgres_fdw temporary data",
2313 
2314  /* Prepare for input conversion of RETURNING results. */
2315  if (dmstate->has_returning)
2316  dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2317 
2318  /*
2319  * Prepare for processing of parameters used in remote query, if any.
2320  */
2321  numParams = list_length(fsplan->fdw_exprs);
2322  dmstate->numParams = numParams;
2323  if (numParams > 0)
2325  fsplan->fdw_exprs,
2326  numParams,
2327  &dmstate->param_flinfo,
2328  &dmstate->param_exprs,
2329  &dmstate->param_values);
2330 }
2331 
2332 /*
2333  * postgresIterateDirectModify
2334  * Execute a direct foreign table modification
2335  */
2336 static TupleTableSlot *
2338 {
2340  EState *estate = node->ss.ps.state;
2341  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2342 
2343  /*
2344  * If this is the first call after Begin, execute the statement.
2345  */
2346  if (dmstate->num_tuples == -1)
2347  execute_dml_stmt(node);
2348 
2349  /*
2350  * If the local query doesn't specify RETURNING, just clear tuple slot.
2351  */
2352  if (!resultRelInfo->ri_projectReturning)
2353  {
2354  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2355  Instrumentation *instr = node->ss.ps.instrument;
2356 
2357  Assert(!dmstate->has_returning);
2358 
2359  /* Increment the command es_processed count if necessary. */
2360  if (dmstate->set_processed)
2361  estate->es_processed += dmstate->num_tuples;
2362 
2363  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2364  if (instr)
2365  instr->tuplecount += dmstate->num_tuples;
2366 
2367  return ExecClearTuple(slot);
2368  }
2369 
2370  /*
2371  * Get the next RETURNING tuple.
2372  */
2373  return get_returning_data(node);
2374 }
2375 
2376 /*
2377  * postgresEndDirectModify
2378  * Finish a direct foreign table modification
2379  */
2380 static void
2382 {
2384 
2385  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2386  if (dmstate == NULL)
2387  return;
2388 
2389  /* Release PGresult */
2390  if (dmstate->result)
2391  PQclear(dmstate->result);
2392 
2393  /* Release remote connection */
2394  ReleaseConnection(dmstate->conn);
2395  dmstate->conn = NULL;
2396 
2397  /* MemoryContext will be deleted automatically. */
2398 }
2399 
2400 /*
2401  * postgresExplainForeignScan
2402  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2403  */
2404 static void
2406 {
2407  List *fdw_private;
2408  char *sql;
2409  char *relations;
2410 
2411  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2412 
2413  /*
2414  * Add names of relation handled by the foreign scan when the scan is a
2415  * join
2416  */
2417  if (list_length(fdw_private) > FdwScanPrivateRelations)
2418  {
2419  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2420  ExplainPropertyText("Relations", relations, es);
2421  }
2422 
2423  /*
2424  * Add remote query, when VERBOSE option is specified.
2425  */
2426  if (es->verbose)
2427  {
2428  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2429  ExplainPropertyText("Remote SQL", sql, es);
2430  }
2431 }
2432 
2433 /*
2434  * postgresExplainForeignModify
2435  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2436  */
2437 static void
2439  ResultRelInfo *rinfo,
2440  List *fdw_private,
2441  int subplan_index,
2442  ExplainState *es)
2443 {
2444  if (es->verbose)
2445  {
2446  char *sql = strVal(list_nth(fdw_private,
2448 
2449  ExplainPropertyText("Remote SQL", sql, es);
2450  }
2451 }
2452 
2453 /*
2454  * postgresExplainDirectModify
2455  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2456  * foreign table directly
2457  */
2458 static void
2460 {
2461  List *fdw_private;
2462  char *sql;
2463 
2464  if (es->verbose)
2465  {
2466  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2467  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2468  ExplainPropertyText("Remote SQL", sql, es);
2469  }
2470 }
2471 
2472 
2473 /*
2474  * estimate_path_cost_size
2475  * Get cost and size estimates for a foreign scan on given foreign relation
2476  * either a base relation or a join between foreign relations or an upper
2477  * relation containing foreign relations.
2478  *
2479  * param_join_conds are the parameterization clauses with outer relations.
2480  * pathkeys specify the expected sort order if any for given path being costed.
2481  *
2482  * The function returns the cost and size estimates in p_row, p_width,
2483  * p_startup_cost and p_total_cost variables.
2484  */
2485 static void
2487  RelOptInfo *foreignrel,
2488  List *param_join_conds,
2489  List *pathkeys,
2490  double *p_rows, int *p_width,
2491  Cost *p_startup_cost, Cost *p_total_cost)
2492 {
2493  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2494  double rows;
2495  double retrieved_rows;
2496  int width;
2497  Cost startup_cost;
2498  Cost total_cost;
2499  Cost cpu_per_tuple;
2500 
2501  /*
2502  * If the table or the server is configured to use remote estimates,
2503  * connect to the foreign server and execute EXPLAIN to estimate the
2504  * number of rows selected by the restriction+join clauses. Otherwise,
2505  * estimate rows using whatever statistics we have locally, in a way
2506  * similar to ordinary tables.
2507  */
2508  if (fpinfo->use_remote_estimate)
2509  {
2510  List *remote_param_join_conds;
2511  List *local_param_join_conds;
2512  StringInfoData sql;
2513  PGconn *conn;
2514  Selectivity local_sel;
2515  QualCost local_cost;
2516  List *fdw_scan_tlist = NIL;
2517  List *remote_conds;
2518 
2519  /* Required only to be passed to deparseSelectStmtForRel */
2520  List *retrieved_attrs;
2521 
2522  /*
2523  * param_join_conds might contain both clauses that are safe to send
2524  * across, and clauses that aren't.
2525  */
2526  classifyConditions(root, foreignrel, param_join_conds,
2527  &remote_param_join_conds, &local_param_join_conds);
2528 
2529  /* Build the list of columns to be fetched from the foreign server. */
2530  if (foreignrel->reloptkind == RELOPT_JOINREL ||
2531  foreignrel->reloptkind == RELOPT_UPPER_REL)
2532  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2533  else
2534  fdw_scan_tlist = NIL;
2535 
2536  /*
2537  * The complete list of remote conditions includes everything from
2538  * baserestrictinfo plus any extra join_conds relevant to this
2539  * particular path.
2540  */
2541  remote_conds = list_concat(list_copy(remote_param_join_conds),
2542  fpinfo->remote_conds);
2543 
2544  /*
2545  * Construct EXPLAIN query including the desired SELECT, FROM, and
2546  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2547  * values, so don't request params_list.
2548  */
2549  initStringInfo(&sql);
2550  appendStringInfoString(&sql, "EXPLAIN ");
2551  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2552  remote_conds, pathkeys, &retrieved_attrs,
2553  NULL);
2554 
2555  /* Get the remote estimate */
2556  conn = GetConnection(fpinfo->user, false);
2557  get_remote_estimate(sql.data, conn, &rows, &width,
2558  &startup_cost, &total_cost);
2559  ReleaseConnection(conn);
2560 
2561  retrieved_rows = rows;
2562 
2563  /* Factor in the selectivity of the locally-checked quals */
2564  local_sel = clauselist_selectivity(root,
2565  local_param_join_conds,
2566  foreignrel->relid,
2567  JOIN_INNER,
2568  NULL);
2569  local_sel *= fpinfo->local_conds_sel;
2570 
2571  rows = clamp_row_est(rows * local_sel);
2572 
2573  /* Add in the eval cost of the locally-checked quals */
2574  startup_cost += fpinfo->local_conds_cost.startup;
2575  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2576  cost_qual_eval(&local_cost, local_param_join_conds, root);
2577  startup_cost += local_cost.startup;
2578  total_cost += local_cost.per_tuple * retrieved_rows;
2579  }
2580  else
2581  {
2582  Cost run_cost = 0;
2583 
2584  /*
2585  * We don't support join conditions in this mode (hence, no
2586  * parameterized paths can be made).
2587  */
2588  Assert(param_join_conds == NIL);
2589 
2590  /*
2591  * Use rows/width estimates made by set_baserel_size_estimates() for
2592  * base foreign relations and set_joinrel_size_estimates() for join
2593  * between foreign relations.
2594  */
2595  rows = foreignrel->rows;
2596  width = foreignrel->reltarget->width;
2597 
2598  /* Back into an estimate of the number of retrieved rows. */
2599  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2600 
2601  /*
2602  * We will come here again and again with different set of pathkeys
2603  * that caller wants to cost. We don't need to calculate the cost of
2604  * bare scan each time. Instead, use the costs if we have cached them
2605  * already.
2606  */
2607  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2608  {
2609  startup_cost = fpinfo->rel_startup_cost;
2610  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2611  }
2612  else if (foreignrel->reloptkind == RELOPT_JOINREL)
2613  {
2614  PgFdwRelationInfo *fpinfo_i;
2615  PgFdwRelationInfo *fpinfo_o;
2616  QualCost join_cost;
2617  QualCost remote_conds_cost;
2618  double nrows;
2619 
2620  /* For join we expect inner and outer relations set */
2621  Assert(fpinfo->innerrel && fpinfo->outerrel);
2622 
2623  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2624  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2625 
2626  /* Estimate of number of rows in cross product */
2627  nrows = fpinfo_i->rows * fpinfo_o->rows;
2628  /* Clamp retrieved rows estimate to at most size of cross product */
2629  retrieved_rows = Min(retrieved_rows, nrows);
2630 
2631  /*
2632  * The cost of foreign join is estimated as cost of generating
2633  * rows for the joining relations + cost for applying quals on the
2634  * rows.
2635  */
2636 
2637  /*
2638  * Calculate the cost of clauses pushed down to the foreign server
2639  */
2640  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2641  /* Calculate the cost of applying join clauses */
2642  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2643 
2644  /*
2645  * Startup cost includes startup cost of joining relations and the
2646  * startup cost for join and other clauses. We do not include the
2647  * startup cost specific to join strategy (e.g. setting up hash
2648  * tables) since we do not know what strategy the foreign server
2649  * is going to use.
2650  */
2651  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2652  startup_cost += join_cost.startup;
2653  startup_cost += remote_conds_cost.startup;
2654  startup_cost += fpinfo->local_conds_cost.startup;
2655 
2656  /*
2657  * Run time cost includes:
2658  *
2659  * 1. Run time cost (total_cost - startup_cost) of relations being
2660  * joined
2661  *
2662  * 2. Run time cost of applying join clauses on the cross product
2663  * of the joining relations.
2664  *
2665  * 3. Run time cost of applying pushed down other clauses on the
2666  * result of join
2667  *
2668  * 4. Run time cost of applying nonpushable other clauses locally
2669  * on the result fetched from the foreign server.
2670  */
2671  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2672  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2673  run_cost += nrows * join_cost.per_tuple;
2674  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2675  run_cost += nrows * remote_conds_cost.per_tuple;
2676  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2677  }
2678  else if (foreignrel->reloptkind == RELOPT_UPPER_REL)
2679  {
2680  PgFdwRelationInfo *ofpinfo;
2681  PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2682  AggClauseCosts aggcosts;
2683  double input_rows;
2684  int numGroupCols;
2685  double numGroups = 1;
2686 
2687  /*
2688  * This cost model is mixture of costing done for sorted and
2689  * hashed aggregates in cost_agg(). We are not sure which
2690  * strategy will be considered at remote side, thus for
2691  * simplicity, we put all startup related costs in startup_cost
2692  * and all finalization and run cost are added in total_cost.
2693  *
2694  * Also, core does not care about costing HAVING expressions and
2695  * adding that to the costs. So similarly, here too we are not
2696  * considering remote and local conditions for costing.
2697  */
2698 
2699  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2700 
2701  /* Get rows and width from input rel */
2702  input_rows = ofpinfo->rows;
2703  width = ofpinfo->width;
2704 
2705  /* Collect statistics about aggregates for estimating costs. */
2706  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2707  if (root->parse->hasAggs)
2708  {
2709  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2710  AGGSPLIT_SIMPLE, &aggcosts);
2711  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2712  AGGSPLIT_SIMPLE, &aggcosts);
2713  }
2714 
2715  /* Get number of grouping columns and possible number of groups */
2716  numGroupCols = list_length(root->parse->groupClause);
2717  numGroups = estimate_num_groups(root,
2719  fpinfo->grouped_tlist),
2720  input_rows, NULL);
2721 
2722  /*
2723  * Number of rows expected from foreign server will be same as
2724  * that of number of groups.
2725  */
2726  rows = retrieved_rows = numGroups;
2727 
2728  /*-----
2729  * Startup cost includes:
2730  * 1. Startup cost for underneath input * relation
2731  * 2. Cost of performing aggregation, per cost_agg()
2732  * 3. Startup cost for PathTarget eval
2733  *-----
2734  */
2735  startup_cost = ofpinfo->rel_startup_cost;
2736  startup_cost += aggcosts.transCost.startup;
2737  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2738  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2739  startup_cost += ptarget->cost.startup;
2740 
2741  /*-----
2742  * Run time cost includes:
2743  * 1. Run time cost of underneath input relation
2744  * 2. Run time cost of performing aggregation, per cost_agg()
2745  * 3. PathTarget eval cost for each output row
2746  *-----
2747  */
2748  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2749  run_cost += aggcosts.finalCost * numGroups;
2750  run_cost += cpu_tuple_cost * numGroups;
2751  run_cost += ptarget->cost.per_tuple * numGroups;
2752  }
2753  else
2754  {
2755  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2756  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2757 
2758  /*
2759  * Cost as though this were a seqscan, which is pessimistic. We
2760  * effectively imagine the local_conds are being evaluated
2761  * remotely, too.
2762  */
2763  startup_cost = 0;
2764  run_cost = 0;
2765  run_cost += seq_page_cost * foreignrel->pages;
2766 
2767  startup_cost += foreignrel->baserestrictcost.startup;
2768  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2769  run_cost += cpu_per_tuple * foreignrel->tuples;
2770  }
2771 
2772  /*
2773  * Without remote estimates, we have no real way to estimate the cost
2774  * of generating sorted output. It could be free if the query plan
2775  * the remote side would have chosen generates properly-sorted output
2776  * anyway, but in most cases it will cost something. Estimate a value
2777  * high enough that we won't pick the sorted path when the ordering
2778  * isn't locally useful, but low enough that we'll err on the side of
2779  * pushing down the ORDER BY clause when it's useful to do so.
2780  */
2781  if (pathkeys != NIL)
2782  {
2783  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2784  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2785  }
2786 
2787  total_cost = startup_cost + run_cost;
2788  }
2789 
2790  /*
2791  * Cache the costs for scans without any pathkeys or parameterization
2792  * before adding the costs for transferring data from the foreign server.
2793  * These costs are useful for costing the join between this relation and
2794  * another foreign relation or to calculate the costs of paths with
2795  * pathkeys for this relation, when the costs can not be obtained from the
2796  * foreign server. This function will be called at least once for every
2797  * foreign relation without pathkeys and parameterization.
2798  */
2799  if (pathkeys == NIL && param_join_conds == NIL)
2800  {
2801  fpinfo->rel_startup_cost = startup_cost;
2802  fpinfo->rel_total_cost = total_cost;
2803  }
2804 
2805  /*
2806  * Add some additional cost factors to account for connection overhead
2807  * (fdw_startup_cost), transferring data across the network
2808  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2809  * (cpu_tuple_cost per retrieved row).
2810  */
2811  startup_cost += fpinfo->fdw_startup_cost;
2812  total_cost += fpinfo->fdw_startup_cost;
2813  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2814  total_cost += cpu_tuple_cost * retrieved_rows;
2815 
2816  /* Return results. */
2817  *p_rows = rows;
2818  *p_width = width;
2819  *p_startup_cost = startup_cost;
2820  *p_total_cost = total_cost;
2821 }
2822 
2823 /*
2824  * Estimate costs of executing a SQL statement remotely.
2825  * The given "sql" must be an EXPLAIN command.
2826  */
2827 static void
2828 get_remote_estimate(const char *sql, PGconn *conn,
2829  double *rows, int *width,
2830  Cost *startup_cost, Cost *total_cost)
2831 {
2832  PGresult *volatile res = NULL;
2833 
2834  /* PGresult must be released before leaving this function. */
2835  PG_TRY();
2836  {
2837  char *line;
2838  char *p;
2839  int n;
2840 
2841  /*
2842  * Execute EXPLAIN remotely.
2843  */
2844  res = pgfdw_exec_query(conn, sql);
2845  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2846  pgfdw_report_error(ERROR, res, conn, false, sql);
2847 
2848  /*
2849  * Extract cost numbers for topmost plan node. Note we search for a
2850  * left paren from the end of the line to avoid being confused by
2851  * other uses of parentheses.
2852  */
2853  line = PQgetvalue(res, 0, 0);
2854  p = strrchr(line, '(');
2855  if (p == NULL)
2856  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2857  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2858  startup_cost, total_cost, rows, width);
2859  if (n != 4)
2860  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2861 
2862  PQclear(res);
2863  res = NULL;
2864  }
2865  PG_CATCH();
2866  {
2867  if (res)
2868  PQclear(res);
2869  PG_RE_THROW();
2870  }
2871  PG_END_TRY();
2872 }
2873 
2874 /*
2875  * Detect whether we want to process an EquivalenceClass member.
2876  *
2877  * This is a callback for use by generate_implied_equalities_for_column.
2878  */
2879 static bool
2882  void *arg)
2883 {
2885  Expr *expr = em->em_expr;
2886 
2887  /*
2888  * If we've identified what we're processing in the current scan, we only
2889  * want to match that expression.
2890  */
2891  if (state->current != NULL)
2892  return equal(expr, state->current);
2893 
2894  /*
2895  * Otherwise, ignore anything we've already processed.
2896  */
2897  if (list_member(state->already_used, expr))
2898  return false;
2899 
2900  /* This is the new target to process. */
2901  state->current = expr;
2902  return true;
2903 }
2904 
2905 /*
2906  * Create cursor for node's query with current parameter values.
2907  */
2908 static void
2910 {
2911  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2912  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2913  int numParams = fsstate->numParams;
2914  const char **values = fsstate->param_values;
2915  PGconn *conn = fsstate->conn;
2917  PGresult *res;
2918 
2919  /*
2920  * Construct array of query parameter values in text format. We do the
2921  * conversions in the short-lived per-tuple context, so as not to cause a
2922  * memory leak over repeated scans.
2923  */
2924  if (numParams > 0)
2925  {
2926  MemoryContext oldcontext;
2927 
2928  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2929 
2930  process_query_params(econtext,
2931  fsstate->param_flinfo,
2932  fsstate->param_exprs,
2933  values);
2934 
2935  MemoryContextSwitchTo(oldcontext);
2936  }
2937 
2938  /* Construct the DECLARE CURSOR command */
2939  initStringInfo(&buf);
2940  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2941  fsstate->cursor_number, fsstate->query);
2942 
2943  /*
2944  * Notice that we pass NULL for paramTypes, thus forcing the remote server
2945  * to infer types for all parameters. Since we explicitly cast every
2946  * parameter (see deparse.c), the "inference" is trivial and will produce
2947  * the desired result. This allows us to avoid assuming that the remote
2948  * server has the same OIDs we do for the parameters' types.
2949  */
2950  if (!PQsendQueryParams(conn, buf.data, numParams,
2951  NULL, values, NULL, NULL, 0))
2952  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2953 
2954  /*
2955  * Get the result, and check for success.
2956  *
2957  * We don't use a PG_TRY block here, so be careful not to throw error
2958  * without releasing the PGresult.
2959  */
2960  res = pgfdw_get_result(conn, buf.data);
2961  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2962  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2963  PQclear(res);
2964 
2965  /* Mark the cursor as created, and show no tuples have been retrieved */
2966  fsstate->cursor_exists = true;
2967  fsstate->tuples = NULL;
2968  fsstate->num_tuples = 0;
2969  fsstate->next_tuple = 0;
2970  fsstate->fetch_ct_2 = 0;
2971  fsstate->eof_reached = false;
2972 
2973  /* Clean up */
2974  pfree(buf.data);
2975 }
2976 
2977 /*
2978  * Fetch some more rows from the node's cursor.
2979  */
2980 static void
2982 {
2983  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2984  PGresult *volatile res = NULL;
2985  MemoryContext oldcontext;
2986 
2987  /*
2988  * We'll store the tuples in the batch_cxt. First, flush the previous
2989  * batch.
2990  */
2991  fsstate->tuples = NULL;
2992  MemoryContextReset(fsstate->batch_cxt);
2993  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
2994 
2995  /* PGresult must be released before leaving this function. */
2996  PG_TRY();
2997  {
2998  PGconn *conn = fsstate->conn;
2999  char sql[64];
3000  int numrows;
3001  int i;
3002 
3003  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3004  fsstate->fetch_size, fsstate->cursor_number);
3005 
3006  res = pgfdw_exec_query(conn, sql);
3007  /* On error, report the original query, not the FETCH. */
3008  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3009  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3010 
3011  /* Convert the data into HeapTuples */
3012  numrows = PQntuples(res);
3013  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3014  fsstate->num_tuples = numrows;
3015  fsstate->next_tuple = 0;
3016 
3017  for (i = 0; i < numrows; i++)
3018  {
3019  Assert(IsA(node->ss.ps.plan, ForeignScan));
3020 
3021  fsstate->tuples[i] =
3023  fsstate->rel,
3024  fsstate->attinmeta,
3025  fsstate->retrieved_attrs,
3026  node,
3027  fsstate->temp_cxt);
3028  }
3029 
3030  /* Update fetch_ct_2 */
3031  if (fsstate->fetch_ct_2 < 2)
3032  fsstate->fetch_ct_2++;
3033 
3034  /* Must be EOF if we didn't get as many tuples as we asked for. */
3035  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3036 
3037  PQclear(res);
3038  res = NULL;
3039  }
3040  PG_CATCH();
3041  {
3042  if (res)
3043  PQclear(res);
3044  PG_RE_THROW();
3045  }
3046  PG_END_TRY();
3047 
3048  MemoryContextSwitchTo(oldcontext);
3049 }
3050 
3051 /*
3052  * Force assorted GUC parameters to settings that ensure that we'll output
3053  * data values in a form that is unambiguous to the remote server.
3054  *
3055  * This is rather expensive and annoying to do once per row, but there's
3056  * little choice if we want to be sure values are transmitted accurately;
3057  * we can't leave the settings in place between rows for fear of affecting
3058  * user-visible computations.
3059  *
3060  * We use the equivalent of a function SET option to allow the settings to
3061  * persist only until the caller calls reset_transmission_modes(). If an
3062  * error is thrown in between, guc.c will take care of undoing the settings.
3063  *
3064  * The return value is the nestlevel that must be passed to
3065  * reset_transmission_modes() to undo things.
3066  */
3067 int
3069 {
3070  int nestlevel = NewGUCNestLevel();
3071 
3072  /*
3073  * The values set here should match what pg_dump does. See also
3074  * configure_remote_session in connection.c.
3075  */
3076  if (DateStyle != USE_ISO_DATES)
3077  (void) set_config_option("datestyle", "ISO",
3079  GUC_ACTION_SAVE, true, 0, false);
3081  (void) set_config_option("intervalstyle", "postgres",
3083  GUC_ACTION_SAVE, true, 0, false);
3084  if (extra_float_digits < 3)
3085  (void) set_config_option("extra_float_digits", "3",
3087  GUC_ACTION_SAVE, true, 0, false);
3088 
3089  return nestlevel;
3090 }
3091 
3092 /*
3093  * Undo the effects of set_transmission_modes().
3094  */
3095 void
3097 {
3098  AtEOXact_GUC(true, nestlevel);
3099 }
3100 
3101 /*
3102  * Utility routine to close a cursor.
3103  */
3104 static void
3106 {
3107  char sql[64];
3108  PGresult *res;
3109 
3110  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3111 
3112  /*
3113  * We don't use a PG_TRY block here, so be careful not to throw error
3114  * without releasing the PGresult.
3115  */
3116  res = pgfdw_exec_query(conn, sql);
3117  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3118  pgfdw_report_error(ERROR, res, conn, true, sql);
3119  PQclear(res);
3120 }
3121 
3122 /*
3123  * prepare_foreign_modify
3124  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3125  */
3126 static void
3128 {
3129  char prep_name[NAMEDATALEN];
3130  char *p_name;
3131  PGresult *res;
3132 
3133  /* Construct name we'll use for the prepared statement. */
3134  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3135  GetPrepStmtNumber(fmstate->conn));
3136  p_name = pstrdup(prep_name);
3137 
3138  /*
3139  * We intentionally do not specify parameter types here, but leave the
3140  * remote server to derive them by default. This avoids possible problems
3141  * with the remote server using different type OIDs than we do. All of
3142  * the prepared statements we use in this module are simple enough that
3143  * the remote server will make the right choices.
3144  */
3145  if (!PQsendPrepare(fmstate->conn,
3146  p_name,
3147  fmstate->query,
3148  0,
3149  NULL))
3150  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3151 
3152  /*
3153  * Get the result, and check for success.
3154  *
3155  * We don't use a PG_TRY block here, so be careful not to throw error
3156  * without releasing the PGresult.
3157  */
3158  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3159  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3160  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3161  PQclear(res);
3162 
3163  /* This action shows that the prepare has been done. */
3164  fmstate->p_name = p_name;
3165 }
3166 
3167 /*
3168  * convert_prep_stmt_params
3169  * Create array of text strings representing parameter values
3170  *
3171  * tupleid is ctid to send, or NULL if none
3172  * slot is slot to get remaining parameters from, or NULL if none
3173  *
3174  * Data is constructed in temp_cxt; caller should reset that after use.
3175  */
3176 static const char **
3178  ItemPointer tupleid,
3179  TupleTableSlot *slot)
3180 {
3181  const char **p_values;
3182  int pindex = 0;
3183  MemoryContext oldcontext;
3184 
3185  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3186 
3187  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3188 
3189  /* 1st parameter should be ctid, if it's in use */
3190  if (tupleid != NULL)
3191  {
3192  /* don't need set_transmission_modes for TID output */
3193  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3194  PointerGetDatum(tupleid));
3195  pindex++;
3196  }
3197 
3198  /* get following parameters from slot */
3199  if (slot != NULL && fmstate->target_attrs != NIL)
3200  {
3201  int nestlevel;
3202  ListCell *lc;
3203 
3204  nestlevel = set_transmission_modes();
3205 
3206  foreach(lc, fmstate->target_attrs)
3207  {
3208  int attnum = lfirst_int(lc);
3209  Datum value;
3210  bool isnull;
3211 
3212  value = slot_getattr(slot, attnum, &isnull);
3213  if (isnull)
3214  p_values[pindex] = NULL;
3215  else
3216  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3217  value);
3218  pindex++;
3219  }
3220 
3221  reset_transmission_modes(nestlevel);
3222  }
3223 
3224  Assert(pindex == fmstate->p_nums);
3225 
3226  MemoryContextSwitchTo(oldcontext);
3227 
3228  return p_values;
3229 }
3230 
3231 /*
3232  * store_returning_result
3233  * Store the result of a RETURNING clause
3234  *
3235  * On error, be sure to release the PGresult on the way out. Callers do not
3236  * have PG_TRY blocks to ensure this happens.
3237  */
3238 static void
3240  TupleTableSlot *slot, PGresult *res)
3241 {
3242  PG_TRY();
3243  {
3244  HeapTuple newtup;
3245 
3246  newtup = make_tuple_from_result_row(res, 0,
3247  fmstate->rel,
3248  fmstate->attinmeta,
3249  fmstate->retrieved_attrs,
3250  NULL,
3251  fmstate->temp_cxt);
3252  /* tuple will be deleted when it is cleared from the slot */
3253  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3254  }
3255  PG_CATCH();
3256  {
3257  if (res)
3258  PQclear(res);
3259  PG_RE_THROW();
3260  }
3261  PG_END_TRY();
3262 }
3263 
3264 /*
3265  * Execute a direct UPDATE/DELETE statement.
3266  */
3267 static void
3269 {
3271  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3272  int numParams = dmstate->numParams;
3273  const char **values = dmstate->param_values;
3274 
3275  /*
3276  * Construct array of query parameter values in text format.
3277  */
3278  if (numParams > 0)
3279  process_query_params(econtext,
3280  dmstate->param_flinfo,
3281  dmstate->param_exprs,
3282  values);
3283 
3284  /*
3285  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3286  * to infer types for all parameters. Since we explicitly cast every
3287  * parameter (see deparse.c), the "inference" is trivial and will produce
3288  * the desired result. This allows us to avoid assuming that the remote
3289  * server has the same OIDs we do for the parameters' types.
3290  */
3291  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3292  NULL, values, NULL, NULL, 0))
3293  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3294 
3295  /*
3296  * Get the result, and check for success.
3297  *
3298  * We don't use a PG_TRY block here, so be careful not to throw error
3299  * without releasing the PGresult.
3300  */
3301  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3302  if (PQresultStatus(dmstate->result) !=
3304  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3305  dmstate->query);
3306 
3307  /* Get the number of rows affected. */
3308  if (dmstate->has_returning)
3309  dmstate->num_tuples = PQntuples(dmstate->result);
3310  else
3311  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3312 }
3313 
3314 /*
3315  * Get the result of a RETURNING clause.
3316  */
3317 static TupleTableSlot *
3319 {
3321  EState *estate = node->ss.ps.state;
3322  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3323  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3324 
3325  Assert(resultRelInfo->ri_projectReturning);
3326 
3327  /* If we didn't get any tuples, must be end of data. */
3328  if (dmstate->next_tuple >= dmstate->num_tuples)
3329  return ExecClearTuple(slot);
3330 
3331  /* Increment the command es_processed count if necessary. */
3332  if (dmstate->set_processed)
3333  estate->es_processed += 1;
3334 
3335  /*
3336  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3337  * tuple. (has_returning is false when the local query is of the form
3338  * "UPDATE/DELETE .. RETURNING 1" for example.)
3339  */
3340  if (!dmstate->has_returning)
3341  ExecStoreAllNullTuple(slot);
3342  else
3343  {
3344  /*
3345  * On error, be sure to release the PGresult on the way out. Callers
3346  * do not have PG_TRY blocks to ensure this happens.
3347  */
3348  PG_TRY();
3349  {
3350  HeapTuple newtup;
3351 
3352  newtup = make_tuple_from_result_row(dmstate->result,
3353  dmstate->next_tuple,
3354  dmstate->rel,
3355  dmstate->attinmeta,
3356  dmstate->retrieved_attrs,
3357  NULL,
3358  dmstate->temp_cxt);
3359  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3360  }
3361  PG_CATCH();
3362  {
3363  if (dmstate->result)
3364  PQclear(dmstate->result);
3365  PG_RE_THROW();
3366  }
3367  PG_END_TRY();
3368  }
3369  dmstate->next_tuple++;
3370 
3371  /* Make slot available for evaluation of the local query RETURNING list. */
3372  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3373 
3374  return slot;
3375 }
3376 
3377 /*
3378  * Prepare for processing of parameters used in remote query.
3379  */
3380 static void
3382  List *fdw_exprs,
3383  int numParams,
3384  FmgrInfo **param_flinfo,
3385  List **param_exprs,
3386  const char ***param_values)
3387 {
3388  int i;
3389  ListCell *lc;
3390 
3391  Assert(numParams > 0);
3392 
3393  /* Prepare for output conversion of parameters used in remote query. */
3394  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3395 
3396  i = 0;
3397  foreach(lc, fdw_exprs)
3398  {
3399  Node *param_expr = (Node *) lfirst(lc);
3400  Oid typefnoid;
3401  bool isvarlena;
3402 
3403  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3404  fmgr_info(typefnoid, &(*param_flinfo)[i]);
3405  i++;
3406  }
3407 
3408  /*
3409  * Prepare remote-parameter expressions for evaluation. (Note: in
3410  * practice, we expect that all these expressions will be just Params, so
3411  * we could possibly do something more efficient than using the full
3412  * expression-eval machinery for this. But probably there would be little
3413  * benefit, and it'd require postgres_fdw to know more than is desirable
3414  * about Param evaluation.)
3415  */
3416  *param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
3417 
3418  /* Allocate buffer for text form of query parameters. */
3419  *param_values = (const char **) palloc0(numParams * sizeof(char *));
3420 }
3421 
3422 /*
3423  * Construct array of query parameter values in text format.
3424  */
3425 static void
3427  FmgrInfo *param_flinfo,
3428  List *param_exprs,
3429  const char **param_values)
3430 {
3431  int nestlevel;
3432  int i;
3433  ListCell *lc;
3434 
3435  nestlevel = set_transmission_modes();
3436 
3437  i = 0;
3438  foreach(lc, param_exprs)
3439  {
3440  ExprState *expr_state = (ExprState *) lfirst(lc);
3441  Datum expr_value;
3442  bool isNull;
3443 
3444  /* Evaluate the parameter expression */
3445  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
3446 
3447  /*
3448  * Get string representation of each parameter value by invoking
3449  * type-specific output function, unless the value is null.
3450  */
3451  if (isNull)
3452  param_values[i] = NULL;
3453  else
3454  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3455 
3456  i++;
3457  }
3458 
3459  reset_transmission_modes(nestlevel);
3460 }
3461 
3462 /*
3463  * postgresAnalyzeForeignTable
3464  * Test whether analyzing this foreign table is supported
3465  */
3466 static bool
3468  AcquireSampleRowsFunc *func,
3469  BlockNumber *totalpages)
3470 {
3471  ForeignTable *table;
3472  UserMapping *user;
3473  PGconn *conn;
3474  StringInfoData sql;
3475  PGresult *volatile res = NULL;
3476 
3477  /* Return the row-analysis function pointer */
3479 
3480  /*
3481  * Now we have to get the number of pages. It's annoying that the ANALYZE
3482  * API requires us to return that now, because it forces some duplication
3483  * of effort between this routine and postgresAcquireSampleRowsFunc. But
3484  * it's probably not worth redefining that API at this point.
3485  */
3486 
3487  /*
3488  * Get the connection to use. We do the remote access as the table's
3489  * owner, even if the ANALYZE was started by some other user.
3490  */
3491  table = GetForeignTable(RelationGetRelid(relation));
3492  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3493  conn = GetConnection(user, false);
3494 
3495  /*
3496  * Construct command to get page count for relation.
3497  */
3498  initStringInfo(&sql);
3499  deparseAnalyzeSizeSql(&sql, relation);
3500 
3501  /* In what follows, do not risk leaking any PGresults. */
3502  PG_TRY();
3503  {
3504  res = pgfdw_exec_query(conn, sql.data);
3505  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3506  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3507 
3508  if (PQntuples(res) != 1 || PQnfields(res) != 1)
3509  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3510  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3511 
3512  PQclear(res);
3513  res = NULL;
3514  }
3515  PG_CATCH();
3516  {
3517  if (res)
3518  PQclear(res);
3519  PG_RE_THROW();
3520  }
3521  PG_END_TRY();
3522 
3523  ReleaseConnection(conn);
3524 
3525  return true;
3526 }
3527 
3528 /*
3529  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3530  *
3531  * We fetch the whole table from the remote side and pick out some sample rows.
3532  *
3533  * Selected rows are returned in the caller-allocated array rows[],
3534  * which must have at least targrows entries.
3535  * The actual number of rows selected is returned as the function result.
3536  * We also count the total number of rows in the table and return it into
3537  * *totalrows. Note that *totaldeadrows is always set to 0.
3538  *
3539  * Note that the returned list of rows is not always in order by physical
3540  * position in the table. Therefore, correlation estimates derived later
3541  * may be meaningless, but it's OK because we don't use the estimates
3542  * currently (the planner only pays attention to correlation for indexscans).
3543  */
3544 static int
3546  HeapTuple *rows, int targrows,
3547  double *totalrows,
3548  double *totaldeadrows)
3549 {
3550  PgFdwAnalyzeState astate;
3551  ForeignTable *table;
3552  ForeignServer *server;
3553  UserMapping *user;
3554  PGconn *conn;
3555  unsigned int cursor_number;
3556  StringInfoData sql;
3557  PGresult *volatile res = NULL;
3558 
3559  /* Initialize workspace state */
3560  astate.rel = relation;
3562 
3563  astate.rows = rows;
3564  astate.targrows = targrows;
3565  astate.numrows = 0;
3566  astate.samplerows = 0;
3567  astate.rowstoskip = -1; /* -1 means not set yet */
3568  reservoir_init_selection_state(&astate.rstate, targrows);
3569 
3570  /* Remember ANALYZE context, and create a per-tuple temp context */
3571  astate.anl_cxt = CurrentMemoryContext;
3573  "postgres_fdw temporary data",
3575 
3576  /*
3577  * Get the connection to use. We do the remote access as the table's
3578  * owner, even if the ANALYZE was started by some other user.
3579  */
3580  table = GetForeignTable(RelationGetRelid(relation));
3581  server = GetForeignServer(table->serverid);
3582  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3583  conn = GetConnection(user, false);
3584 
3585  /*
3586  * Construct cursor that retrieves whole rows from remote.
3587  */
3588  cursor_number = GetCursorNumber(conn);
3589  initStringInfo(&sql);
3590  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3591  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3592 
3593  /* In what follows, do not risk leaking any PGresults. */
3594  PG_TRY();
3595  {
3596  res = pgfdw_exec_query(conn, sql.data);
3597  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3598  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3599  PQclear(res);
3600  res = NULL;
3601 
3602  /* Retrieve and process rows a batch at a time. */
3603  for (;;)
3604  {
3605  char fetch_sql[64];
3606  int fetch_size;
3607  int numrows;
3608  int i;
3609  ListCell *lc;
3610 
3611  /* Allow users to cancel long query */
3613 
3614  /*
3615  * XXX possible future improvement: if rowstoskip is large, we
3616  * could issue a MOVE rather than physically fetching the rows,
3617  * then just adjust rowstoskip and samplerows appropriately.
3618  */
3619 
3620  /* The fetch size is arbitrary, but shouldn't be enormous. */
3621  fetch_size = 100;
3622  foreach(lc, server->options)
3623  {
3624  DefElem *def = (DefElem *) lfirst(lc);
3625 
3626  if (strcmp(def->defname, "fetch_size") == 0)
3627  {
3628  fetch_size = strtol(defGetString(def), NULL, 10);
3629  break;
3630  }
3631  }
3632  foreach(lc, table->options)
3633  {
3634  DefElem *def = (DefElem *) lfirst(lc);
3635 
3636  if (strcmp(def->defname, "fetch_size") == 0)
3637  {
3638  fetch_size = strtol(defGetString(def), NULL, 10);
3639  break;
3640  }
3641  }
3642 
3643  /* Fetch some rows */
3644  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3645  fetch_size, cursor_number);
3646 
3647  res = pgfdw_exec_query(conn, fetch_sql);
3648  /* On error, report the original query, not the FETCH. */
3649  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3650  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3651 
3652  /* Process whatever we got. */
3653  numrows = PQntuples(res);
3654  for (i = 0; i < numrows; i++)
3655  analyze_row_processor(res, i, &astate);
3656 
3657  PQclear(res);
3658  res = NULL;
3659 
3660  /* Must be EOF if we didn't get all the rows requested. */
3661  if (numrows < fetch_size)
3662  break;
3663  }
3664 
3665  /* Close the cursor, just to be tidy. */
3666  close_cursor(conn, cursor_number);
3667  }
3668  PG_CATCH();
3669  {
3670  if (res)
3671  PQclear(res);
3672  PG_RE_THROW();
3673  }
3674  PG_END_TRY();
3675 
3676  ReleaseConnection(conn);
3677 
3678  /* We assume that we have no dead tuple. */
3679  *totaldeadrows = 0.0;
3680 
3681  /* We've retrieved all living tuples from foreign server. */
3682  *totalrows = astate.samplerows;
3683 
3684  /*
3685  * Emit some interesting relation info
3686  */
3687  ereport(elevel,
3688  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3689  RelationGetRelationName(relation),
3690  astate.samplerows, astate.numrows)));
3691 
3692  return astate.numrows;
3693 }
3694 
3695 /*
3696  * Collect sample rows from the result of query.
3697  * - Use all tuples in sample until target # of samples are collected.
3698  * - Subsequently, replace already-sampled tuples randomly.
3699  */
3700 static void
3702 {
3703  int targrows = astate->targrows;
3704  int pos; /* array index to store tuple in */
3705  MemoryContext oldcontext;
3706 
3707  /* Always increment sample row counter. */
3708  astate->samplerows += 1;
3709 
3710  /*
3711  * Determine the slot where this sample row should be stored. Set pos to
3712  * negative value to indicate the row should be skipped.
3713  */
3714  if (astate->numrows < targrows)
3715  {
3716  /* First targrows rows are always included into the sample */
3717  pos = astate->numrows++;
3718  }
3719  else
3720  {
3721  /*
3722  * Now we start replacing tuples in the sample until we reach the end
3723  * of the relation. Same algorithm as in acquire_sample_rows in
3724  * analyze.c; see Jeff Vitter's paper.
3725  */
3726  if (astate->rowstoskip < 0)
3727  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3728 
3729  if (astate->rowstoskip <= 0)
3730  {
3731  /* Choose a random reservoir element to replace. */
3732  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3733  Assert(pos >= 0 && pos < targrows);
3734  heap_freetuple(astate->rows[pos]);
3735  }
3736  else
3737  {
3738  /* Skip this tuple. */
3739  pos = -1;
3740  }
3741 
3742  astate->rowstoskip -= 1;
3743  }
3744 
3745  if (pos >= 0)
3746  {
3747  /*
3748  * Create sample tuple from current result row, and store it in the
3749  * position determined above. The tuple has to be created in anl_cxt.
3750  */
3751  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3752 
3753  astate->rows[pos] = make_tuple_from_result_row(res, row,
3754  astate->rel,
3755  astate->attinmeta,
3756  astate->retrieved_attrs,
3757  NULL,
3758  astate->temp_cxt);
3759 
3760  MemoryContextSwitchTo(oldcontext);
3761  }
3762 }
3763 
3764 /*
3765  * Import a foreign schema
3766  */
3767 static List *
3769 {
3770  List *commands = NIL;
3771  bool import_collate = true;
3772  bool import_default = false;
3773  bool import_not_null = true;
3774  ForeignServer *server;
3775  UserMapping *mapping;
3776  PGconn *conn;
3778  PGresult *volatile res = NULL;
3779  int numrows,
3780  i;
3781  ListCell *lc;
3782 
3783  /* Parse statement options */
3784  foreach(lc, stmt->options)
3785  {
3786  DefElem *def = (DefElem *) lfirst(lc);
3787 
3788  if (strcmp(def->defname, "import_collate") == 0)
3789  import_collate = defGetBoolean(def);
3790  else if (strcmp(def->defname, "import_default") == 0)
3791  import_default = defGetBoolean(def);
3792  else if (strcmp(def->defname, "import_not_null") == 0)
3793  import_not_null = defGetBoolean(def);
3794  else
3795  ereport(ERROR,
3796  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3797  errmsg("invalid option \"%s\"", def->defname)));
3798  }
3799 
3800  /*
3801  * Get connection to the foreign server. Connection manager will
3802  * establish new connection if necessary.
3803  */
3804  server = GetForeignServer(serverOid);
3805  mapping = GetUserMapping(GetUserId(), server->serverid);
3806  conn = GetConnection(mapping, false);
3807 
3808  /* Don't attempt to import collation if remote server hasn't got it */
3809  if (PQserverVersion(conn) < 90100)
3810  import_collate = false;
3811 
3812  /* Create workspace for strings */
3813  initStringInfo(&buf);
3814 
3815  /* In what follows, do not risk leaking any PGresults. */
3816  PG_TRY();
3817  {
3818  /* Check that the schema really exists */
3819  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3820  deparseStringLiteral(&buf, stmt->remote_schema);
3821 
3822  res = pgfdw_exec_query(conn, buf.data);
3823  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3824  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3825 
3826  if (PQntuples(res) != 1)
3827  ereport(ERROR,
3828  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3829  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3830  stmt->remote_schema, server->servername)));
3831 
3832  PQclear(res);
3833  res = NULL;
3834  resetStringInfo(&buf);
3835 
3836  /*
3837  * Fetch all table data from this schema, possibly restricted by
3838  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3839  * to EXCEPT/LIMIT TO here, because the core code will filter the
3840  * statements we return according to those lists anyway. But it
3841  * should save a few cycles to not process excluded tables in the
3842  * first place.)
3843  *
3844  * Note: because we run the connection with search_path restricted to
3845  * pg_catalog, the format_type() and pg_get_expr() outputs will always
3846  * include a schema name for types/functions in other schemas, which
3847  * is what we want.
3848  */
3849  if (import_collate)
3851  "SELECT relname, "
3852  " attname, "
3853  " format_type(atttypid, atttypmod), "
3854  " attnotnull, "
3855  " pg_get_expr(adbin, adrelid), "
3856  " collname, "
3857  " collnsp.nspname "
3858  "FROM pg_class c "
3859  " JOIN pg_namespace n ON "
3860  " relnamespace = n.oid "
3861  " LEFT JOIN pg_attribute a ON "
3862  " attrelid = c.oid AND attnum > 0 "
3863  " AND NOT attisdropped "
3864  " LEFT JOIN pg_attrdef ad ON "
3865  " adrelid = c.oid AND adnum = attnum "
3866  " LEFT JOIN pg_collation coll ON "
3867  " coll.oid = attcollation "
3868  " LEFT JOIN pg_namespace collnsp ON "
3869  " collnsp.oid = collnamespace ");
3870  else
3872  "SELECT relname, "
3873  " attname, "
3874  " format_type(atttypid, atttypmod), "
3875  " attnotnull, "
3876  " pg_get_expr(adbin, adrelid), "
3877  " NULL, NULL "
3878  "FROM pg_class c "
3879  " JOIN pg_namespace n ON "
3880  " relnamespace = n.oid "
3881  " LEFT JOIN pg_attribute a ON "
3882  " attrelid = c.oid AND attnum > 0 "
3883  " AND NOT attisdropped "
3884  " LEFT JOIN pg_attrdef ad ON "
3885  " adrelid = c.oid AND adnum = attnum ");
3886 
3888  "WHERE c.relkind IN ('r', 'v', 'f', 'm') "
3889  " AND n.nspname = ");
3890  deparseStringLiteral(&buf, stmt->remote_schema);
3891 
3892  /* Apply restrictions for LIMIT TO and EXCEPT */
3893  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3895  {
3896  bool first_item = true;
3897 
3898  appendStringInfoString(&buf, " AND c.relname ");
3899  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3900  appendStringInfoString(&buf, "NOT ");
3901  appendStringInfoString(&buf, "IN (");
3902 
3903  /* Append list of table names within IN clause */
3904  foreach(lc, stmt->table_list)
3905  {
3906  RangeVar *rv = (RangeVar *) lfirst(lc);
3907 
3908  if (first_item)
3909  first_item = false;
3910  else
3911  appendStringInfoString(&buf, ", ");
3912  deparseStringLiteral(&buf, rv->relname);
3913  }
3914  appendStringInfoChar(&buf, ')');
3915  }
3916 
3917  /* Append ORDER BY at the end of query to ensure output ordering */
3918  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3919 
3920  /* Fetch the data */
3921  res = pgfdw_exec_query(conn, buf.data);
3922  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3923  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3924 
3925  /* Process results */
3926  numrows = PQntuples(res);
3927  /* note: incrementation of i happens in inner loop's while() test */
3928  for (i = 0; i < numrows;)
3929  {
3930  char *tablename = PQgetvalue(res, i, 0);
3931  bool first_item = true;
3932 
3933  resetStringInfo(&buf);
3934  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3935  quote_identifier(tablename));
3936 
3937  /* Scan all rows for this table */
3938  do
3939  {
3940  char *attname;
3941  char *typename;
3942  char *attnotnull;
3943  char *attdefault;
3944  char *collname;
3945  char *collnamespace;
3946 
3947  /* If table has no columns, we'll see nulls here */
3948  if (PQgetisnull(res, i, 1))
3949  continue;
3950 
3951  attname = PQgetvalue(res, i, 1);
3952  typename = PQgetvalue(res, i, 2);
3953  attnotnull = PQgetvalue(res, i, 3);
3954  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3955  PQgetvalue(res, i, 4);
3956  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
3957  PQgetvalue(res, i, 5);
3958  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
3959  PQgetvalue(res, i, 6);
3960 
3961  if (first_item)
3962  first_item = false;
3963  else
3964  appendStringInfoString(&buf, ",\n");
3965 
3966  /* Print column name and type */
3967  appendStringInfo(&buf, " %s %s",
3968  quote_identifier(attname),
3969  typename);
3970 
3971  /*
3972  * Add column_name option so that renaming the foreign table's
3973  * column doesn't break the association to the underlying
3974  * column.
3975  */
3976  appendStringInfoString(&buf, " OPTIONS (column_name ");
3977  deparseStringLiteral(&buf, attname);
3978  appendStringInfoChar(&buf, ')');
3979 
3980  /* Add COLLATE if needed */
3981  if (import_collate && collname != NULL && collnamespace != NULL)
3982  appendStringInfo(&buf, " COLLATE %s.%s",
3983  quote_identifier(collnamespace),
3984  quote_identifier(collname));
3985 
3986  /* Add DEFAULT if needed */
3987  if (import_default && attdefault != NULL)
3988  appendStringInfo(&buf, " DEFAULT %s", attdefault);
3989 
3990  /* Add NOT NULL if needed */
3991  if (import_not_null && attnotnull[0] == 't')
3992  appendStringInfoString(&buf, " NOT NULL");
3993  }
3994  while (++i < numrows &&
3995  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
3996 
3997  /*
3998  * Add server name and table-level options. We specify remote
3999  * schema and table name as options (the latter to ensure that
4000  * renaming the foreign table doesn't break the association).
4001  */
4002  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4003  quote_identifier(server->servername));
4004 
4005  appendStringInfoString(&buf, "schema_name ");
4006  deparseStringLiteral(&buf, stmt->remote_schema);
4007  appendStringInfoString(&buf, ", table_name ");
4008  deparseStringLiteral(&buf, tablename);
4009 
4010  appendStringInfoString(&buf, ");");
4011 
4012  commands = lappend(commands, pstrdup(buf.data));
4013  }
4014 
4015  /* Clean up */
4016  PQclear(res);
4017  res = NULL;
4018  }
4019  PG_CATCH();
4020  {
4021  if (res)
4022  PQclear(res);
4023  PG_RE_THROW();
4024  }
4025  PG_END_TRY();
4026 
4027  ReleaseConnection(conn);
4028 
4029  return commands;
4030 }
4031 
4032 /*
4033  * Assess whether the join between inner and outer relations can be pushed down
4034  * to the foreign server. As a side effect, save information we obtain in this
4035  * function to PgFdwRelationInfo passed in.
4036  */
4037 static bool
4039  RelOptInfo *outerrel, RelOptInfo *innerrel,
4040  JoinPathExtraData *extra)
4041 {
4042  PgFdwRelationInfo *fpinfo;
4043  PgFdwRelationInfo *fpinfo_o;
4044  PgFdwRelationInfo *fpinfo_i;
4045  ListCell *lc;
4046  List *joinclauses;
4047  List *otherclauses;
4048 
4049  /*
4050  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4051  * Constructing queries representing SEMI and ANTI joins is hard, hence
4052  * not considered right now.
4053  */
4054  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4055  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4056  return false;
4057 
4058  /*
4059  * If either of the joining relations is marked as unsafe to pushdown, the
4060  * join can not be pushed down.
4061  */
4062  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4063  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4064  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4065  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4066  !fpinfo_i || !fpinfo_i->pushdown_safe)
4067  return false;
4068 
4069  /*
4070  * If joining relations have local conditions, those conditions are
4071  * required to be applied before joining the relations. Hence the join can
4072  * not be pushed down.
4073  */
4074  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4075  return false;
4076 
4077  /* Separate restrict list into join quals and quals on join relation */
4078  if (IS_OUTER_JOIN(jointype))
4079  extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses);
4080  else
4081  {
4082  /*
4083  * Unlike an outer join, for inner join, the join result contains only
4084  * the rows which satisfy join clauses, similar to the other clause.
4085  * Hence all clauses can be treated as other quals. This helps to push
4086  * a join down to the foreign server even if some of its join quals
4087  * are not safe to pushdown.
4088  */
4089  otherclauses = extract_actual_clauses(extra->restrictlist, false);
4090  joinclauses = NIL;
4091  }
4092 
4093  /* Join quals must be safe to push down. */
4094  foreach(lc, joinclauses)
4095  {
4096  Expr *expr = (Expr *) lfirst(lc);
4097 
4098  if (!is_foreign_expr(root, joinrel, expr))
4099  return false;
4100  }
4101 
4102  /*
4103  * deparseExplicitTargetList() isn't smart enough to handle anything other
4104  * than a Var. In particular, if there's some PlaceHolderVar that would
4105  * need to be evaluated within this join tree (because there's an upper
4106  * reference to a quantity that may go to NULL as a result of an outer
4107  * join), then we can't try to push the join down because we'll fail when
4108  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4109  * needs to be evaluated *at the top* of this join tree is OK, because we
4110  * can do that locally after fetching the results from the remote side.
4111  */
4112  foreach(lc, root->placeholder_list)
4113  {
4114  PlaceHolderInfo *phinfo = lfirst(lc);
4115  Relids relids = joinrel->relids;
4116 
4117  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4118  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4119  return false;
4120  }
4121 
4122  /* Save the join clauses, for later use. */
4123  fpinfo->joinclauses = joinclauses;
4124 
4125  /*
4126  * Other clauses are applied after the join has been performed and thus
4127  * need not be all pushable. We will push those which can be pushed to
4128  * reduce the number of rows fetched from the foreign server. Rest of them
4129  * will be applied locally after fetching join result. Add them to fpinfo
4130  * so that other joins involving this joinrel will know that this joinrel
4131  * has local clauses.
4132  */
4133  foreach(lc, otherclauses)
4134  {
4135  Expr *expr = (Expr *) lfirst(lc);
4136 
4137  if (!is_foreign_expr(root, joinrel, expr))
4138  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4139  else
4140  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4141  }
4142 
4143  fpinfo->outerrel = outerrel;
4144  fpinfo->innerrel = innerrel;
4145  fpinfo->jointype = jointype;
4146 
4147  /*
4148  * Pull the other remote conditions from the joining relations into join
4149  * clauses or other remote clauses (remote_conds) of this relation
4150  * wherever possible. This avoids building subqueries at every join step,
4151  * which is not currently supported by the deparser logic.
4152  *
4153  * For an inner join, clauses from both the relations are added to the
4154  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4155  * the outer side are added to remote_conds since those can be evaluated
4156  * after the join is evaluated. The clauses from inner side are added to
4157  * the joinclauses, since they need to be evaluated while constructing the
4158  * join.
4159  *
4160  * For a FULL OUTER JOIN, the other clauses from either relation can not
4161  * be added to the joinclauses or remote_conds, since each relation acts
4162  * as an outer relation for the other. Consider such full outer join as
4163  * unshippable because of the reasons mentioned above in this comment.
4164  *
4165  * The joining sides can not have local conditions, thus no need to test
4166  * shippability of the clauses being pulled up.
4167  */
4168  switch (jointype)
4169  {
4170  case JOIN_INNER:
4171  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4172  list_copy(fpinfo_i->remote_conds));
4173  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4174  list_copy(fpinfo_o->remote_conds));
4175  break;
4176 
4177  case JOIN_LEFT:
4178  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4179  list_copy(fpinfo_i->remote_conds));
4180  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4181  list_copy(fpinfo_o->remote_conds));
4182  break;
4183 
4184  case JOIN_RIGHT:
4185  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4186  list_copy(fpinfo_o->remote_conds));
4187  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4188  list_copy(fpinfo_i->remote_conds));
4189  break;
4190 
4191  case JOIN_FULL:
4192  if (fpinfo_i->remote_conds || fpinfo_o->remote_conds)
4193  return false;
4194  break;
4195 
4196  default:
4197  /* Should not happen, we have just check this above */
4198  elog(ERROR, "unsupported join type %d", jointype);
4199  }
4200 
4201  /*
4202  * For an inner join, all restrictions can be treated alike. Treating the
4203  * pushed down conditions as join conditions allows a top level full outer
4204  * join to be deparsed without requiring subqueries.
4205  */
4206  if (jointype == JOIN_INNER)
4207  {
4208  Assert(!fpinfo->joinclauses);
4209  fpinfo->joinclauses = fpinfo->remote_conds;
4210  fpinfo->remote_conds = NIL;
4211  }
4212 
4213  /* Mark that this join can be pushed down safely */
4214  fpinfo->pushdown_safe = true;
4215 
4216  /*
4217  * If user is willing to estimate cost for a scan of either of the joining
4218  * relations using EXPLAIN, he intends to estimate scans on that relation
4219  * more accurately. Then, it makes sense to estimate the cost of the join
4220  * with that relation more accurately using EXPLAIN.
4221  */
4222  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4223  fpinfo_i->use_remote_estimate;
4224 
4225  /* Get user mapping */
4226  if (fpinfo->use_remote_estimate)
4227  {
4228  if (fpinfo_o->use_remote_estimate)
4229  fpinfo->user = fpinfo_o->user;
4230  else
4231  fpinfo->user = fpinfo_i->user;
4232  }
4233  else
4234  fpinfo->user = NULL;
4235 
4236  /* Get foreign server */
4237  fpinfo->server = fpinfo_o->server;
4238 
4239  /*
4240  * Since both the joining relations come from the same server, the server
4241  * level options should have same value for both the relations. Pick from
4242  * any side.
4243  */
4244  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4245  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4246 
4247  /*
4248  * Set cached relation costs to some negative value, so that we can detect
4249  * when they are set to some sensible costs, during one (usually the
4250  * first) of the calls to estimate_path_cost_size().
4251  */
4252  fpinfo->rel_startup_cost = -1;
4253  fpinfo->rel_total_cost = -1;
4254 
4255  /*
4256  * Set fetch size to maximum of the joining sides, since we are expecting
4257  * the rows returned by the join to be proportional to the relation sizes.
4258  */
4259  if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
4260  fpinfo->fetch_size = fpinfo_o->fetch_size;
4261  else
4262  fpinfo->fetch_size = fpinfo_i->fetch_size;
4263 
4264  /*
4265  * Set the string describing this join relation to be used in EXPLAIN
4266  * output of corresponding ForeignScan.
4267  */
4268  fpinfo->relation_name = makeStringInfo();
4269  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4270  fpinfo_o->relation_name->data,
4271  get_jointype_name(fpinfo->jointype),
4272  fpinfo_i->relation_name->data);
4273 
4274  return true;
4275 }
4276 
4277 static void
4279  Path *epq_path)
4280 {
4281  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4282  ListCell *lc;
4283 
4284  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4285 
4286  /* Create one path for each set of pathkeys we found above. */
4287  foreach(lc, useful_pathkeys_list)
4288  {
4289  double rows;
4290  int width;
4291  Cost startup_cost;
4292  Cost total_cost;
4293  List *useful_pathkeys = lfirst(lc);
4294 
4295  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4296  &rows, &width, &startup_cost, &total_cost);
4297 
4298  add_path(rel, (Path *)
4299  create_foreignscan_path(root, rel,
4300  NULL,
4301  rows,
4302  startup_cost,
4303  total_cost,
4304  useful_pathkeys,
4305  NULL,
4306  epq_path,
4307  NIL));
4308  }
4309 }
4310 
4311 /*
4312  * postgresGetForeignJoinPaths
4313  * Add possible ForeignPath to joinrel, if join is safe to push down.
4314  */
4315 static void
4317  RelOptInfo *joinrel,
4318  RelOptInfo *outerrel,
4319  RelOptInfo *innerrel,
4320  JoinType jointype,
4321  JoinPathExtraData *extra)
4322 {
4323  PgFdwRelationInfo *fpinfo;
4324  ForeignPath *joinpath;
4325  double rows;
4326  int width;
4327  Cost startup_cost;
4328  Cost total_cost;
4329  Path *epq_path; /* Path to create plan to be executed when
4330  * EvalPlanQual gets triggered. */
4331 
4332  /*
4333  * Skip if this join combination has been considered already.
4334  */
4335  if (joinrel->fdw_private)
4336  return;
4337 
4338  /*
4339  * Create unfinished PgFdwRelationInfo entry which is used to indicate
4340  * that the join relation is already considered, so that we won't waste
4341  * time in judging safety of join pushdown and adding the same paths again
4342  * if found safe. Once we know that this join can be pushed down, we fill
4343  * the entry.
4344  */
4345  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4346  fpinfo->pushdown_safe = false;
4347  joinrel->fdw_private = fpinfo;
4348  /* attrs_used is only for base relations. */
4349  fpinfo->attrs_used = NULL;
4350 
4351  /*
4352  * If there is a possibility that EvalPlanQual will be executed, we need
4353  * to be able to reconstruct the row using scans of the base relations.
4354  * GetExistingLocalJoinPath will find a suitable path for this purpose in
4355  * the path list of the joinrel, if one exists. We must be careful to
4356  * call it before adding any ForeignPath, since the ForeignPath might
4357  * dominate the only suitable local path available. We also do it before
4358  * reconstruct the row for EvalPlanQual(). Find an alternative local path
4359  * calling foreign_join_ok(), since that function updates fpinfo and marks
4360  * it as pushable if the join is found to be pushable.
4361  */
4362  if (root->parse->commandType == CMD_DELETE ||
4363  root->parse->commandType == CMD_UPDATE ||
4364  root->rowMarks)
4365  {
4366  epq_path = GetExistingLocalJoinPath(joinrel);
4367  if (!epq_path)
4368  {
4369  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4370  return;
4371  }
4372  }
4373  else
4374  epq_path = NULL;
4375 
4376  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4377  {
4378  /* Free path required for EPQ if we copied one; we don't need it now */
4379  if (epq_path)
4380  pfree(epq_path);
4381  return;
4382  }
4383 
4384  /*
4385  * Compute the selectivity and cost of the local_conds, so we don't have
4386  * to do it over again for each path. The best we can do for these
4387  * conditions is to estimate selectivity on the basis of local statistics.
4388  * The local conditions are applied after the join has been computed on
4389  * the remote side like quals in WHERE clause, so pass jointype as
4390  * JOIN_INNER.
4391  */
4392  fpinfo->local_conds_sel = clauselist_selectivity(root,
4393  fpinfo->local_conds,
4394  0,
4395  JOIN_INNER,
4396  NULL);
4397  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4398 
4399  /*
4400  * If we are going to estimate costs locally, estimate the join clause
4401  * selectivity here while we have special join info.
4402  */
4403  if (!fpinfo->use_remote_estimate)
4404  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4405  0, fpinfo->jointype,
4406  extra->sjinfo);
4407 
4408  /* Estimate costs for bare join relation */
4409  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4410  &width, &startup_cost, &total_cost);
4411  /* Now update this information in the joinrel */
4412  joinrel->rows = rows;
4413  joinrel->reltarget->width = width;
4414  fpinfo->rows = rows;
4415  fpinfo->width = width;
4416  fpinfo->startup_cost = startup_cost;
4417  fpinfo->total_cost = total_cost;
4418 
4419  /*
4420  * Create a new join path and add it to the joinrel which represents a
4421  * join between foreign tables.
4422  */
4423  joinpath = create_foreignscan_path(root,
4424  joinrel,
4425  NULL, /* default pathtarget */
4426  rows,
4427  startup_cost,
4428  total_cost,
4429  NIL, /* no pathkeys */
4430  NULL, /* no required_outer */
4431  epq_path,
4432  NIL); /* no fdw_private */
4433 
4434  /* Add generated path into joinrel by add_path(). */
4435  add_path(joinrel, (Path *) joinpath);
4436 
4437  /* Consider pathkeys for the join relation */
4438  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4439 
4440  /* XXX Consider parameterized paths for the join relation */
4441 }
4442 
4443 /*
4444  * Assess whether the aggregation, grouping and having operations can be pushed
4445  * down to the foreign server. As a side effect, save information we obtain in
4446  * this function to PgFdwRelationInfo of the input relation.
4447  */
4448 static bool
4450 {
4451  Query *query = root->parse;
4452  PathTarget *grouping_target;
4453  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
4454  PgFdwRelationInfo *ofpinfo;
4455  List *aggvars;
4456  ListCell *lc;
4457  int i;
4458  List *tlist = NIL;
4459 
4460  /* Grouping Sets are not pushable */
4461  if (query->groupingSets)
4462  return false;
4463 
4464  /* Get the fpinfo of the underlying scan relation. */
4465  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
4466 
4467  /*
4468  * If underneath input relation has any local conditions, those conditions
4469  * are required to be applied before performing aggregation. Hence the
4470  * aggregate cannot be pushed down.
4471  */
4472  if (ofpinfo->local_conds)
4473  return false;
4474 
4475  /*
4476  * The targetlist expected from this node and the targetlist pushed down
4477  * to the foreign server may be different. The latter requires
4478  * sortgrouprefs to be set to push down GROUP BY clause, but should not
4479  * have those arising from ORDER BY clause. These sortgrouprefs may be
4480  * different from those in the plan's targetlist. Use a copy of path
4481  * target to record the new sortgrouprefs.
4482  */
4483  grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]);
4484 
4485  /*
4486  * Evaluate grouping targets and check whether they are safe to push down
4487  * to the foreign side. All GROUP BY expressions will be part of the
4488  * grouping target and thus there is no need to evaluate it separately.
4489  * While doing so, add required expressions into target list which can
4490  * then be used to pass to foreign server.
4491  */
4492  i = 0;
4493  foreach(lc, grouping_target->exprs)
4494  {
4495  Expr *expr = (Expr *) lfirst(lc);
4496  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
4497  ListCell *l;
4498 
4499  /* Check whether this expression is part of GROUP BY clause */
4500  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
4501  {
4502  /*
4503  * If any of the GROUP BY expression is not shippable we can not
4504  * push down aggregation to the foreign server.
4505  */
4506  if (!is_foreign_expr(root, grouped_rel, expr))
4507  return false;
4508 
4509  /* Pushable, add to tlist */
4510  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4511  }
4512  else
4513  {
4514  /* Check entire expression whether it is pushable or not */
4515  if (is_foreign_expr(root, grouped_rel, expr))
4516  {
4517  /* Pushable, add to tlist */
4518  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4519  }
4520  else
4521  {
4522  /*
4523  * If we have sortgroupref set, then it means that we have an
4524  * ORDER BY entry pointing to this expression. Since we are
4525  * not pushing ORDER BY with GROUP BY, clear it.
4526  */
4527  if (sgref)
4528  grouping_target->sortgrouprefs[i] = 0;
4529 
4530  /* Not matched exactly, pull the var with aggregates then */
4531  aggvars = pull_var_clause((Node *) expr,
4533 
4534  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
4535  return false;
4536 
4537  /*
4538  * Add aggregates, if any, into the targetlist. Plain var
4539  * nodes should be either same as some GROUP BY expression or
4540  * part of some GROUP BY expression. In later case, the query
4541  * cannot refer plain var nodes without the surrounding
4542  * expression. In both the cases, they are already part of
4543  * the targetlist and thus no need to add them again. In fact
4544  * adding pulled plain var nodes in SELECT clause will cause
4545  * an error on the foreign server if they are not same as some
4546  * GROUP BY expression.
4547  */
4548  foreach(l, aggvars)
4549  {
4550  Expr *expr = (Expr *) lfirst(l);
4551 
4552  if (IsA(expr, Aggref))
4553  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4554  }
4555  }
4556  }
4557 
4558  i++;
4559  }
4560 
4561  /*
4562  * Classify the pushable and non-pushable having clauses and save them in
4563  * remote_conds and local_conds of the grouped rel's fpinfo.
4564  */
4565  if (root->hasHavingQual && query->havingQual)
4566  {
4567  ListCell *lc;
4568 
4569  foreach(lc, (List *) query->havingQual)
4570  {
4571  Expr *expr = (Expr *) lfirst(lc);
4572 
4573  if (!is_foreign_expr(root, grouped_rel, expr))
4574  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4575  else
4576  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4577  }
4578  }
4579 
4580  /*
4581  * If there are any local conditions, pull Vars and aggregates from it and
4582  * check whether they are safe to pushdown or not.
4583  */
4584  if (fpinfo->local_conds)
4585  {
4586  ListCell *lc;
4587  List *aggvars = pull_var_clause((Node *) fpinfo->local_conds,
4589 
4590  foreach(lc, aggvars)
4591  {
4592  Expr *expr = (Expr *) lfirst(lc);
4593 
4594  /*
4595  * If aggregates within local conditions are not safe to push
4596  * down, then we cannot push down the query. Vars are already
4597  * part of GROUP BY clause which are checked above, so no need to
4598  * access them again here.
4599  */
4600  if (IsA(expr, Aggref))
4601  {
4602  if (!is_foreign_expr(root, grouped_rel, expr))
4603  return false;
4604 
4605  tlist = add_to_flat_tlist(tlist, aggvars);
4606  }
4607  }
4608  }
4609 
4610  /* Transfer any sortgroupref data to the replacement tlist */
4611  apply_pathtarget_labeling_to_tlist(tlist, grouping_target);
4612 
4613  /* Store generated targetlist */
4614  fpinfo->grouped_tlist = tlist;
4615 
4616  /* Safe to pushdown */
4617  fpinfo->pushdown_safe = true;
4618 
4619  /*
4620  * If user is willing to estimate cost for a scan using EXPLAIN, he
4621  * intends to estimate scans on that relation more accurately. Then, it
4622  * makes sense to estimate the cost of the grouping on that relation more
4623  * accurately using EXPLAIN.
4624  */
4625  fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate;
4626 
4627  /* Copy startup and tuple cost as is from underneath input rel's fpinfo */
4628  fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost;
4629  fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost;
4630 
4631  /*
4632  * Set cached relation costs to some negative value, so that we can detect
4633  * when they are set to some sensible costs, during one (usually the
4634  * first) of the calls to estimate_path_cost_size().
4635  */
4636  fpinfo->rel_startup_cost = -1;
4637  fpinfo->rel_total_cost = -1;
4638 
4639  /* Set fetch size same as that of underneath input rel's fpinfo */
4640  fpinfo->fetch_size = ofpinfo->fetch_size;
4641 
4642  /*
4643  * Set the string describing this grouped relation to be used in EXPLAIN
4644  * output of corresponding ForeignScan.
4645  */
4646  fpinfo->relation_name = makeStringInfo();
4647  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
4648  ofpinfo->relation_name->data);
4649 
4650  return true;
4651 }
4652 
4653 /*
4654  * postgresGetForeignUpperPaths
4655  * Add paths for post-join operations like aggregation, grouping etc. if
4656  * corresponding operations are safe to push down.
4657  *
4658  * Right now, we only support aggregate, grouping and having clause pushdown.
4659  */
4660 static void
4662  RelOptInfo *input_rel, RelOptInfo *output_rel)
4663 {
4664  PgFdwRelationInfo *fpinfo;
4665 
4666  /*
4667  * If input rel is not safe to pushdown, then simply return as we cannot
4668  * perform any post-join operations on the foreign server.
4669  */
4670  if (!input_rel->fdw_private ||
4671  !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
4672  return;
4673 
4674  /* Ignore stages we don't support; and skip any duplicate calls. */
4675  if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
4676  return;
4677 
4678  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4679  fpinfo->pushdown_safe = false;
4680  output_rel->fdw_private = fpinfo;
4681 
4682  add_foreign_grouping_paths(root, input_rel, output_rel);
4683 }
4684 
4685 /*
4686  * add_foreign_grouping_paths
4687  * Add foreign path for grouping and/or aggregation.
4688  *
4689  * Given input_rel represents the underlying scan. The paths are added to the
4690  * given grouped_rel.
4691  */
4692 static void
4694  RelOptInfo *grouped_rel)
4695 {
4696  Query *parse = root->parse;
4697  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
4698  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
4699  ForeignPath *grouppath;
4700  PathTarget *grouping_target;
4701  double rows;
4702  int width;
4703  Cost startup_cost;
4704  Cost total_cost;
4705 
4706  /* Nothing to be done, if there is no grouping or aggregation required. */
4707  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
4708  !root->hasHavingQual)
4709  return;
4710 
4711  grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4712 
4713  /* save the input_rel as outerrel in fpinfo */
4714  fpinfo->outerrel = input_rel;
4715 
4716  /*
4717  * Copy foreign table, foreign server, user mapping, shippable extensions
4718  * etc. details from the input relation's fpinfo.
4719  */
4720  fpinfo->table = ifpinfo->table;
4721  fpinfo->server = ifpinfo->server;
4722  fpinfo->user = ifpinfo->user;
4723  fpinfo->shippable_extensions = ifpinfo->shippable_extensions;
4724 
4725  /* Assess if it is safe to push down aggregation and grouping. */
4726  if (!foreign_grouping_ok(root, grouped_rel))
4727  return;
4728 
4729  /* Estimate the cost of push down */
4730  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
4731  &width, &startup_cost, &total_cost);
4732 
4733  /* Now update this information in the fpinfo */
4734  fpinfo->rows = rows;
4735  fpinfo->width = width;
4736  fpinfo->startup_cost = startup_cost;
4737  fpinfo->total_cost = total_cost;
4738 
4739  /* Create and add foreign path to the grouping relation. */
4740  grouppath = create_foreignscan_path(root,
4741  grouped_rel,
4742  grouping_target,
4743  rows,
4744  startup_cost,
4745  total_cost,
4746  NIL, /* no pathkeys */
4747  NULL, /* no required_outer */
4748  NULL,
4749  NIL); /* no fdw_private */
4750 
4751  /* Add generated path into grouped_rel by add_path(). */
4752  add_path(grouped_rel, (Path *) grouppath);
4753 }
4754 
4755 /*
4756  * Create a tuple from the specified row of the PGresult.
4757  *
4758  * rel is the local representation of the foreign table, attinmeta is
4759  * conversion data for the rel's tupdesc, and retrieved_attrs is an
4760  * integer list of the table column numbers present in the PGresult.
4761  * temp_context is a working context that can be reset after each tuple.
4762  */
4763 static HeapTuple
4765  int row,
4766  Relation rel,
4767  AttInMetadata *attinmeta,
4768  List *retrieved_attrs,
4769  ForeignScanState *fsstate,
4770  MemoryContext temp_context)
4771 {
4772  HeapTuple tuple;
4773  TupleDesc tupdesc;
4774  Datum *values;
4775  bool *nulls;
4776  ItemPointer ctid = NULL;
4777  Oid oid = InvalidOid;
4778  ConversionLocation errpos;
4779  ErrorContextCallback errcallback;
4780  MemoryContext oldcontext;
4781  ListCell *lc;
4782  int j;
4783 
4784  Assert(row < PQntuples(res));
4785 
4786  /*
4787  * Do the following work in a temp context that we reset after each tuple.
4788  * This cleans up not only the data we have direct access to, but any
4789  * cruft the I/O functions might leak.
4790  */
4791  oldcontext = MemoryContextSwitchTo(temp_context);
4792 
4793  if (rel)
4794  tupdesc = RelationGetDescr(rel);
4795  else
4796  {
4797  PgFdwScanState *fdw_sstate;
4798 
4799  Assert(fsstate);
4800  fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4801  tupdesc = fdw_sstate->tupdesc;
4802  }
4803 
4804  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4805  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4806  /* Initialize to nulls for any columns not present in result */
4807  memset(nulls, true, tupdesc->natts * sizeof(bool));
4808 
4809  /*
4810  * Set up and install callback to report where conversion error occurs.
4811  */
4812  errpos.rel = rel;
4813  errpos.cur_attno = 0;
4814  errpos.fsstate = fsstate;
4815  errcallback.callback = conversion_error_callback;
4816  errcallback.arg = (void *) &errpos;
4817  errcallback.previous = error_context_stack;
4818  error_context_stack = &errcallback;
4819 
4820  /*
4821  * i indexes columns in the relation, j indexes columns in the PGresult.
4822  */
4823  j = 0;
4824  foreach(lc, retrieved_attrs)
4825  {
4826  int i = lfirst_int(lc);
4827  char *valstr;
4828 
4829  /* fetch next column's textual value */
4830  if (PQgetisnull(res, row, j))
4831  valstr = NULL;
4832  else
4833  valstr = PQgetvalue(res, row, j);
4834 
4835  /*
4836  * convert value to internal representation
4837  *
4838  * Note: we ignore system columns other than ctid and oid in result
4839  */
4840  errpos.cur_attno = i;
4841  if (i > 0)
4842  {
4843  /* ordinary column */
4844  Assert(i <= tupdesc->natts);
4845  nulls[i - 1] = (valstr == NULL);
4846  /* Apply the input function even to nulls, to support domains */
4847  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4848  valstr,
4849  attinmeta->attioparams[i - 1],
4850  attinmeta->atttypmods[i - 1]);
4851  }
4852  else if (i == SelfItemPointerAttributeNumber)
4853  {
4854  /* ctid */
4855  if (valstr != NULL)
4856  {
4857  Datum datum;
4858 
4859  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
4860  ctid = (ItemPointer) DatumGetPointer(datum);
4861  }
4862  }
4863  else if (i == ObjectIdAttributeNumber)
4864  {
4865  /* oid */
4866  if (valstr != NULL)
4867  {
4868  Datum datum;
4869 
4870  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
4871  oid = DatumGetObjectId(datum);
4872  }
4873  }
4874  errpos.cur_attno = 0;
4875 
4876  j++;
4877  }
4878 
4879  /* Uninstall error context callback. */
4880  error_context_stack = errcallback.previous;
4881 
4882  /*
4883  * Check we got the expected number of columns. Note: j == 0 and
4884  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
4885  */
4886  if (j > 0 && j != PQnfields(res))
4887  elog(ERROR, "remote query result does not match the foreign table");
4888 
4889  /*
4890  * Build the result tuple in caller's memory context.
4891  */
4892  MemoryContextSwitchTo(oldcontext);
4893 
4894  tuple = heap_form_tuple(tupdesc, values, nulls);
4895 
4896  /*
4897  * If we have a CTID to return, install it in both t_self and t_ctid.
4898  * t_self is the normal place, but if the tuple is converted to a
4899  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
4900  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
4901  */
4902  if (ctid)
4903  tuple->t_self = tuple->t_data->t_ctid = *ctid;
4904 
4905  /*
4906  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
4907  * heap_form_tuple. heap_form_tuple actually creates the tuple with
4908  * DatumTupleFields, not HeapTupleFields, but the executor expects
4909  * HeapTupleFields and will happily extract system columns on that
4910  * assumption. If we don't do this then, for example, the tuple length
4911  * ends up in the xmin field, which isn't what we want.
4912  */
4916 
4917  /*
4918  * If we have an OID to return, install it.
4919  */
4920  if (OidIsValid(oid))
4921  HeapTupleSetOid(tuple, oid);
4922 
4923  /* Clean up */
4924  MemoryContextReset(temp_context);
4925 
4926  return tuple;
4927 }
4928 
4929 /*
4930  * Callback function which is called when error occurs during column value
4931  * conversion. Print names of column and relation.
4932  */
4933 static void
4935 {
4936  const char *attname = NULL;
4937  const char *relname = NULL;
4938  bool is_wholerow = false;
4939  ConversionLocation *errpos = (ConversionLocation *) arg;
4940 
4941  if (errpos->rel)
4942  {
4943  /* error occurred in a scan against a foreign table */
4944  TupleDesc tupdesc = RelationGetDescr(errpos->rel);
4945 
4946  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
4947  attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname);
4948  else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
4949  attname = "ctid";
4950  else if (errpos->cur_attno == ObjectIdAttributeNumber)
4951  attname = "oid";
4952 
4953  relname = RelationGetRelationName(errpos->rel);
4954  }
4955  else
4956  {
4957  /* error occurred in a scan against a foreign join */
4958  ForeignScanState *fsstate = errpos->fsstate;
4959  ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
4960  EState *estate = fsstate->ss.ps.state;
4961  TargetEntry *tle;
4962 
4964  errpos->cur_attno - 1));
4965 
4966  /*
4967  * Target list can have Vars and expressions. For Vars, we can get
4968  * it's relation, however for expressions we can't. Thus for
4969  * expressions, just show generic context message.
4970  */
4971  if (IsA(tle->expr, Var))
4972  {
4973  RangeTblEntry *rte;
4974  Var *var = (Var *) tle->expr;
4975 
4976  rte = rt_fetch(var->varno, estate->es_range_table);
4977 
4978  if (var->varattno == 0)
4979  is_wholerow = true;
4980  else
4981  attname = get_relid_attribute_name(rte->relid, var->varattno);
4982 
4983  relname = get_rel_name(rte->relid);
4984  }
4985  else
4986  errcontext("processing expression at position %d in select list",
4987  errpos->cur_attno);
4988  }
4989 
4990  if (relname)
4991  {
4992  if (is_wholerow)
4993  errcontext("whole-row reference to foreign table \"%s\"", relname);
4994  else if (attname)
4995  errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
4996  }
4997 }
4998 
4999 /*
5000  * Find an equivalence class member expression, all of whose Vars, come from
5001  * the indicated relation.
5002  */
5003 extern Expr *
5005 {
5006  ListCell *lc_em;
5007 
5008  foreach(lc_em, ec->ec_members)
5009  {
5010  EquivalenceMember *em = lfirst(lc_em);
5011 
5012  if (bms_is_subset(em->em_relids, rel->relids))
5013  {
5014  /*
5015  * If there is more than one equivalence member whose Vars are
5016  * taken entirely from this relation, we'll be content to choose
5017  * any one of those.
5018  */
5019  return em->em_expr;
5020  }
5021  }
5022 
5023  /* We didn't find any suitable equivalence class expression */
5024  return NULL;
5025 }
GetForeignPlan_function GetForeignPlan
Definition: fdwapi.h:175
bool has_eclass_joins
Definition: relation.h:551
Value * makeString(char *str)
Definition: value.c:53
BeginForeignScan_function BeginForeignScan
Definition: fdwapi.h:176
GetForeignUpperPaths_function GetForeignUpperPaths
Definition: fdwapi.h:190
ExecForeignDelete_function ExecForeignDelete
Definition: fdwapi.h:198
#define PG_RETURN_POINTER(x)
Definition: fmgr.h:305
RelOptInfo * find_childrel_top_parent(PlannerInfo *root, RelOptInfo *rel)
Definition: relnode.c:951
EndDirectModify_function EndDirectModify
Definition: fdwapi.h:204
#define NIL
Definition: pg_list.h:69
PathTarget * copy_pathtarget(PathTarget *src)
Definition: tlist.c:629
List * rowMarks
Definition: relation.h:251
ScanState ss
Definition: execnodes.h:1631
void deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1484
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
Datum postgres_fdw_handler(PG_FUNCTION_ARGS)
Definition: postgres_fdw.c:425
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:787
Definition: fmgr.h:53
List * qual
Definition: plannodes.h:130
static struct @76 value
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
double estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows, List **pgset)
Definition: selfuncs.c:3272
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
FdwModifyPrivateIndex
Definition: postgres_fdw.c:90
Relation ri_RelationDesc
Definition: execnodes.h:335
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:380
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
Query * parse
Definition: relation.h:152
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2600
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:412
Relids ph_eval_at
Definition: relation.h:1935
static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
ParamPathInfo * get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, Relids required_outer)
Definition: relnode.c:1010
ExplainForeignScan_function ExplainForeignScan
Definition: fdwapi.h:212
RelOptKind reloptkind
Definition: relation.h:487
Index scanrelid
Definition: plannodes.h:306
List * query_pathkeys
Definition: relation.h:257
Instrumentation * instrument
Definition: execnodes.h:1052
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1183
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:9968
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path)
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
void extract_actual_join_clauses(List *restrictinfo_list, List **joinquals, List **otherquals)
Definition: restrictinfo.c:381
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:512
HeapTuple * rows
Definition: postgres_fdw.c:229
AttrNumber ExecFindJunkAttributeInTlist(List *targetlist, const char *attrName)
Definition: execJunk.c:221
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2870
#define RelationGetDescr(relation)
Definition: rel.h:425
#define DEBUG3
Definition: elog.h:23
Oid GetUserId(void)
Definition: miscinit.c:283
#define ObjectIdAttributeNumber
Definition: sysattr.h:22
AnalyzeForeignTable_function AnalyzeForeignTable
Definition: fdwapi.h:217
#define castNode(_type_, nodeptr)
Definition: nodes.h:577
static void create_cursor(ForeignScanState *node)
static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel)
const char ** param_values
Definition: postgres_fdw.c:208
#define PointerGetDatum(X)
Definition: postgres.h:564
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3014
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
List * param_exprs
Definition: postgres_fdw.c:143
int(* AcquireSampleRowsFunc)(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
Definition: fdwapi.h:134
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:196
bool eclass_useful_for_merging(PlannerInfo *root, EquivalenceClass *eclass, RelOptInfo *rel)
Definition: equivclass.c:2393
#define DatumGetObjectId(X)
Definition: postgres.h:508
char * pstrdup(const char *in)
Definition: mcxt.c:1165
ExprContext * ps_ExprContext
Definition: execnodes.h:1077
static void postgresBeginForeignScan(ForeignScanState *node, int eflags)
double tuples
Definition: relation.h:529
List * baserestrictinfo
Definition: relation.h:544
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:155
List * fdw_exprs
Definition: plannodes.h:556
void get_agg_clause_costs(PlannerInfo *root, Node *clause, AggSplit aggsplit, AggClauseCosts *costs)
Definition: clauses.c:466
PG_FUNCTION_INFO_V1(postgres_fdw_handler)
bool hasAggs
Definition: parsenodes.h:116
Relids clause_relids
Definition: relation.h:1652
AttInMetadata * attinmeta
Definition: postgres_fdw.c:195
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, List **retrieved_attrs, List **params_list)
Definition: deparse.c:902
#define Min(x, y)
Definition: c.h:801
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:907
ForeignServer * server
Definition: postgres_fdw.h:78
bool pseudoconstant
Definition: relation.h:1645
int resultRelation
Definition: parsenodes.h:113
List * fdw_private
Definition: plannodes.h:557
static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:694
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IS_OUTER_JOIN(jointype)
Definition: nodes.h:714
double sampler_random_fract(SamplerRandomState randstate)
Definition: sampling.c:238
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
List * groupingSets
Definition: parsenodes.h:139
uint64 fetch_size
Definition: logging.c:21
#define InvalidBuffer
Definition: buf.h:25
void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds)
Definition: deparse.c:186
int set_transmission_modes(void)
List * list_copy(const List *oldlist)
Definition: list.c:1160
Definition: nodes.h:508
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
int IntervalStyle
Definition: globals.c:108
#define MemSet(start, val, len)
Definition: c.h:852
AttrNumber varattno
Definition: primnodes.h:146
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
bool canSetTag
Definition: plannodes.h:203
CmdType operation
Definition: execnodes.h:1150
List * list_concat(List *list1, List *list2)
Definition: list.c:321
int32 * atttypmods
Definition: funcapi.h:47
PathKey * make_canonical_pathkey(PlannerInfo *root, EquivalenceClass *eclass, Oid opfamily, int strategy, bool nulls_first)
Definition: pathkeys.c:51
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1290
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
uint32 BlockNumber
Definition: block.h:31
EquivalenceClass * right_ec
Definition: relation.h:1686
List * retrieved_attrs
Definition: postgres_fdw.c:135
void reservoir_init_selection_state(ReservoirState rs, int n)
Definition: sampling.c:129
List * pull_var_clause(Node *node, int flags)
Definition: var.c:535
List * fdw_scan_tlist
Definition: plannodes.h:558
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:555
double Selectivity
Definition: nodes.h:631
Relation ss_currentRelation
Definition: execnodes.h:1288
EState * state
Definition: execnodes.h:1048
QualCost transCost
Definition: relation.h:62
List * es_range_table
Definition: execnodes.h:372
static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:482
Form_pg_class rd_rel
Definition: rel.h:113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5950
static int postgresIsForeignRelUpdatable(Relation rel)
UpperRelationKind
Definition: relation.h:71
Definition: primnodes.h:141
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
struct ErrorContextCallback * previous
Definition: elog.h:238
#define OidIsValid(objectId)
Definition: c.h:533
Oid * attioparams
Definition: funcapi.h:44
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
List * retrieved_attrs
Definition: postgres_fdw.c:178
List * mergeopfamilies
Definition: relation.h:1682
static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
int natts
Definition: tupdesc.h:73
CmdType operation
Definition: plannodes.h:554
List * plans
Definition: plannodes.h:207
static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
static void close_cursor(PGconn *conn, unsigned int cursor_number)
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
Cost startup
Definition: relation.h:45
void pull_varattnos(Node *node, Index varno, Bitmapset **varattnos)
Definition: var.c:219
AddForeignUpdateTargets_function AddForeignUpdateTargets
Definition: fdwapi.h:193
Index ri_RangeTableIndex
Definition: execnodes.h:334
static void postgresEndDirectModify(ForeignScanState *node)
#define PVC_INCLUDE_AGGREGATES
Definition: var.h:20
#define USE_ISO_DATES
Definition: miscadmin.h:210
JoinType
Definition: nodes.h:665
List * targetList
Definition: parsenodes.h:131
void deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1421
unsigned int cursor_number
Definition: postgres_fdw.c:139
struct RelOptInfo ** simple_rel_array
Definition: relation.h:176
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1926
ItemPointerData * ItemPointer
Definition: itemptr.h:48
HeapTupleHeader t_data
Definition: htup.h:67
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:698
ErrorContextCallback * error_context_stack
Definition: elog.c:88
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
RecheckForeignScan_function RecheckForeignScan
Definition: fdwapi.h:209
IterateDirectModify_function IterateDirectModify
Definition: fdwapi.h:203
#define list_make1(x1)
Definition: pg_list.h:133
#define NAMEDATALEN
PlanState ps
Definition: execnodes.h:1287
char * relname
Definition: primnodes.h:67
Value * makeInteger(long i)
Definition: value.c:23
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
Definition: explain.c:3046
JoinType jointype
Definition: plannodes.h:619
static void postgresEndForeignScan(ForeignScanState *node)
Relids lateral_relids
Definition: relation.h:515
bool defGetBoolean(DefElem *def)
Definition: define.c:111
Cost per_tuple
Definition: relation.h:46
#define TIDOID
Definition: pg_type.h:332
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4266
void pfree(void *pointer)
Definition: mcxt.c:992
MemoryContext es_query_cxt
Definition: execnodes.h:396
void deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1594
SpecialJoinInfo * sjinfo
Definition: relation.h:2051
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
ForeignScanState * fsstate
Definition: postgres_fdw.c:257
#define linitial(l)
Definition: pg_list.h:110
#define planner_rt_fetch(rti, root)
Definition: relation.h:320
#define ExecEvalExpr(expr, econtext, isNull)
Definition: executor.h:73
#define ERROR
Definition: elog.h:43
PlanState ps
Definition: execnodes.h:1149
GetForeignJoinPaths_function GetForeignJoinPaths
Definition: fdwapi.h:187
const char ** param_values
Definition: postgres_fdw.c:144
bool list_member(const List *list, const void *datum)
Definition: list.c:444
List * list_append_unique_ptr(List *list, void *datum)
Definition: list.c:975
#define lfirst_int(lc)
Definition: pg_list.h:107
void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root)
Definition: costsize.c:3199
PGconn * conn
Definition: streamutil.c:45
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:159
char * defGetString(DefElem *def)
Definition: define.c:49
ItemPointerData t_ctid
Definition: htup_details.h:150
ItemPointerData t_self
Definition: htup.h:65
Selectivity local_conds_sel
Definition: postgres_fdw.h:56