PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "postgres_fdw.h"
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "catalog/pg_class.h"
20 #include "commands/defrem.h"
21 #include "commands/explain.h"
22 #include "commands/vacuum.h"
23 #include "foreign/fdwapi.h"
24 #include "funcapi.h"
25 #include "miscadmin.h"
26 #include "nodes/makefuncs.h"
27 #include "nodes/nodeFuncs.h"
28 #include "optimizer/cost.h"
29 #include "optimizer/clauses.h"
30 #include "optimizer/pathnode.h"
31 #include "optimizer/paths.h"
32 #include "optimizer/planmain.h"
33 #include "optimizer/restrictinfo.h"
34 #include "optimizer/var.h"
35 #include "optimizer/tlist.h"
36 #include "parser/parsetree.h"
37 #include "utils/builtins.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/sampling.h"
43 #include "utils/selfuncs.h"
44 
46 
47 /* Default CPU cost to start up a foreign query. */
48 #define DEFAULT_FDW_STARTUP_COST 100.0
49 
50 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
51 #define DEFAULT_FDW_TUPLE_COST 0.01
52 
53 /* If no remote estimates, assume a sort costs 20% extra */
54 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
55 
56 /*
57  * Indexes of FDW-private information stored in fdw_private lists.
58  *
59  * These items are indexed with the enum FdwScanPrivateIndex, so an item
60  * can be fetched with list_nth(). For example, to get the SELECT statement:
61  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
62  */
64 {
65  /* SQL statement to execute remotely (as a String node) */
67  /* List of restriction clauses that can be executed remotely */
69  /* Integer list of attribute numbers retrieved by the SELECT */
71  /* Integer representing the desired fetch_size */
73 
74  /*
75  * String describing join i.e. names of relations being joined and types
76  * of join, added when the scan is join
77  */
79 };
80 
81 /*
82  * Similarly, this enum describes what's kept in the fdw_private list for
83  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
84  *
85  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
86  * 2) Integer list of target attribute numbers for INSERT/UPDATE
87  * (NIL for a DELETE)
88  * 3) Boolean flag showing if the remote query has a RETURNING clause
89  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
90  */
92 {
93  /* SQL statement to execute remotely (as a String node) */
95  /* Integer list of target attribute numbers for INSERT/UPDATE */
97  /* has-returning flag (as an integer Value node) */
99  /* Integer list of attribute numbers retrieved by RETURNING */
101 };
102 
103 /*
104  * Similarly, this enum describes what's kept in the fdw_private list for
105  * a ForeignScan node that modifies a foreign table directly. We store:
106  *
107  * 1) UPDATE/DELETE statement text to be sent to the remote server
108  * 2) Boolean flag showing if the remote query has a RETURNING clause
109  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
110  * 4) Boolean flag showing if we set the command es_processed
111  */
113 {
114  /* SQL statement to execute remotely (as a String node) */
116  /* has-returning flag (as an integer Value node) */
118  /* Integer list of attribute numbers retrieved by RETURNING */
120  /* set-processed flag (as an integer Value node) */
122 };
123 
124 /*
125  * Execution state of a foreign scan using postgres_fdw.
126  */
127 typedef struct PgFdwScanState
128 {
129  Relation rel; /* relcache entry for the foreign table. NULL
130  * for a foreign join scan. */
131  TupleDesc tupdesc; /* tuple descriptor of scan */
132  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
133 
134  /* extracted fdw_private data */
135  char *query; /* text of SELECT command */
136  List *retrieved_attrs; /* list of retrieved attribute numbers */
137 
138  /* for remote query execution */
139  PGconn *conn; /* connection for the scan */
140  unsigned int cursor_number; /* quasi-unique ID for my cursor */
141  bool cursor_exists; /* have we created the cursor? */
142  int numParams; /* number of parameters passed to query */
143  FmgrInfo *param_flinfo; /* output conversion functions for them */
144  List *param_exprs; /* executable expressions for param values */
145  const char **param_values; /* textual values of query parameters */
146 
147  /* for storing result tuples */
148  HeapTuple *tuples; /* array of currently-retrieved tuples */
149  int num_tuples; /* # of tuples in array */
150  int next_tuple; /* index of next one to return */
151 
152  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
153  int fetch_ct_2; /* Min(# of fetches done, 2) */
154  bool eof_reached; /* true if last fetch reached EOF */
155 
156  /* working memory contexts */
157  MemoryContext batch_cxt; /* context holding current batch of tuples */
158  MemoryContext temp_cxt; /* context for per-tuple temporary data */
159 
160  int fetch_size; /* number of tuples per fetch */
162 
163 /*
164  * Execution state of a foreign insert/update/delete operation.
165  */
166 typedef struct PgFdwModifyState
167 {
168  Relation rel; /* relcache entry for the foreign table */
169  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
170 
171  /* for remote query execution */
172  PGconn *conn; /* connection for the scan */
173  char *p_name; /* name of prepared statement, if created */
174 
175  /* extracted fdw_private data */
176  char *query; /* text of INSERT/UPDATE/DELETE command */
177  List *target_attrs; /* list of target attribute numbers */
178  bool has_returning; /* is there a RETURNING clause? */
179  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
180 
181  /* info about parameters for prepared statement */
182  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
183  int p_nums; /* number of parameters to transmit */
184  FmgrInfo *p_flinfo; /* output conversion functions for them */
185 
186  /* working memory context */
187  MemoryContext temp_cxt; /* context for per-tuple temporary data */
189 
190 /*
191  * Execution state of a foreign scan that modifies a foreign table directly.
192  */
194 {
195  Relation rel; /* relcache entry for the foreign table */
196  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
197 
198  /* extracted fdw_private data */
199  char *query; /* text of UPDATE/DELETE command */
200  bool has_returning; /* is there a RETURNING clause? */
201  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
202  bool set_processed; /* do we set the command es_processed? */
203 
204  /* for remote query execution */
205  PGconn *conn; /* connection for the update */
206  int numParams; /* number of parameters passed to query */
207  FmgrInfo *param_flinfo; /* output conversion functions for them */
208  List *param_exprs; /* executable expressions for param values */
209  const char **param_values; /* textual values of query parameters */
210 
211  /* for storing result tuples */
212  PGresult *result; /* result for query */
213  int num_tuples; /* # of result tuples */
214  int next_tuple; /* index of next one to return */
215 
216  /* working memory context */
217  MemoryContext temp_cxt; /* context for per-tuple temporary data */
219 
220 /*
221  * Workspace for analyzing a foreign table.
222  */
223 typedef struct PgFdwAnalyzeState
224 {
225  Relation rel; /* relcache entry for the foreign table */
226  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
227  List *retrieved_attrs; /* attr numbers retrieved by query */
228 
229  /* collected sample rows */
230  HeapTuple *rows; /* array of size targrows */
231  int targrows; /* target # of sample rows */
232  int numrows; /* # of sample rows collected */
233 
234  /* for random sampling */
235  double samplerows; /* # of rows fetched */
236  double rowstoskip; /* # of rows to skip before next sample */
237  ReservoirStateData rstate; /* state for reservoir sampling */
238 
239  /* working memory contexts */
240  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
241  MemoryContext temp_cxt; /* context for per-tuple temporary data */
243 
244 /*
245  * Identify the attribute where data conversion fails.
246  */
247 typedef struct ConversionLocation
248 {
249  Relation rel; /* foreign table's relcache entry. */
250  AttrNumber cur_attno; /* attribute number being processed, or 0 */
251 
252  /*
253  * In case of foreign join push down, fdw_scan_tlist is used to identify
254  * the Var node corresponding to the error location and
255  * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
256  * to get the relation name and attribute name.
257  */
260 
261 /* Callback argument for ec_member_matches_foreign */
262 typedef struct
263 {
264  Expr *current; /* current expr, or NULL if not yet found */
265  List *already_used; /* expressions already dealt with */
267 
268 /*
269  * SQL functions
270  */
272 
273 /*
274  * FDW callback routines
275  */
276 static void postgresGetForeignRelSize(PlannerInfo *root,
277  RelOptInfo *baserel,
278  Oid foreigntableid);
279 static void postgresGetForeignPaths(PlannerInfo *root,
280  RelOptInfo *baserel,
281  Oid foreigntableid);
283  RelOptInfo *baserel,
284  Oid foreigntableid,
285  ForeignPath *best_path,
286  List *tlist,
287  List *scan_clauses,
288  Plan *outer_plan);
289 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
292 static void postgresEndForeignScan(ForeignScanState *node);
293 static void postgresAddForeignUpdateTargets(Query *parsetree,
294  RangeTblEntry *target_rte,
295  Relation target_relation);
297  ModifyTable *plan,
298  Index resultRelation,
299  int subplan_index);
300 static void postgresBeginForeignModify(ModifyTableState *mtstate,
301  ResultRelInfo *resultRelInfo,
302  List *fdw_private,
303  int subplan_index,
304  int eflags);
306  ResultRelInfo *resultRelInfo,
307  TupleTableSlot *slot,
308  TupleTableSlot *planSlot);
310  ResultRelInfo *resultRelInfo,
311  TupleTableSlot *slot,
312  TupleTableSlot *planSlot);
314  ResultRelInfo *resultRelInfo,
315  TupleTableSlot *slot,
316  TupleTableSlot *planSlot);
317 static void postgresEndForeignModify(EState *estate,
318  ResultRelInfo *resultRelInfo);
320 static bool postgresPlanDirectModify(PlannerInfo *root,
321  ModifyTable *plan,
322  Index resultRelation,
323  int subplan_index);
324 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
326 static void postgresEndDirectModify(ForeignScanState *node);
328  ExplainState *es);
330  ResultRelInfo *rinfo,
331  List *fdw_private,
332  int subplan_index,
333  ExplainState *es);
335  ExplainState *es);
336 static bool postgresAnalyzeForeignTable(Relation relation,
337  AcquireSampleRowsFunc *func,
338  BlockNumber *totalpages);
340  Oid serverOid);
341 static void postgresGetForeignJoinPaths(PlannerInfo *root,
342  RelOptInfo *joinrel,
343  RelOptInfo *outerrel,
344  RelOptInfo *innerrel,
345  JoinType jointype,
346  JoinPathExtraData *extra);
348  TupleTableSlot *slot);
349 static void postgresGetForeignUpperPaths(PlannerInfo *root,
350  UpperRelationKind stage,
351  RelOptInfo *input_rel,
352  RelOptInfo *output_rel);
353 
354 /*
355  * Helper functions
356  */
357 static void estimate_path_cost_size(PlannerInfo *root,
358  RelOptInfo *baserel,
359  List *join_conds,
360  List *pathkeys,
361  double *p_rows, int *p_width,
362  Cost *p_startup_cost, Cost *p_total_cost);
363 static void get_remote_estimate(const char *sql,
364  PGconn *conn,
365  double *rows,
366  int *width,
367  Cost *startup_cost,
368  Cost *total_cost);
369 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
371  void *arg);
372 static void create_cursor(ForeignScanState *node);
373 static void fetch_more_data(ForeignScanState *node);
374 static void close_cursor(PGconn *conn, unsigned int cursor_number);
375 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
376 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
377  ItemPointer tupleid,
378  TupleTableSlot *slot);
379 static void store_returning_result(PgFdwModifyState *fmstate,
380  TupleTableSlot *slot, PGresult *res);
381 static void execute_dml_stmt(ForeignScanState *node);
383 static void prepare_query_params(PlanState *node,
384  List *fdw_exprs,
385  int numParams,
386  FmgrInfo **param_flinfo,
387  List **param_exprs,
388  const char ***param_values);
389 static void process_query_params(ExprContext *econtext,
390  FmgrInfo *param_flinfo,
391  List *param_exprs,
392  const char **param_values);
393 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
394  HeapTuple *rows, int targrows,
395  double *totalrows,
396  double *totaldeadrows);
397 static void analyze_row_processor(PGresult *res, int row,
398  PgFdwAnalyzeState *astate);
400  int row,
401  Relation rel,
402  AttInMetadata *attinmeta,
403  List *retrieved_attrs,
404  ForeignScanState *fsstate,
405  MemoryContext temp_context);
406 static void conversion_error_callback(void *arg);
407 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
408  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
409  JoinPathExtraData *extra);
410 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
412  RelOptInfo *rel);
415  Path *epq_path);
416 static void add_foreign_grouping_paths(PlannerInfo *root,
417  RelOptInfo *input_rel,
418  RelOptInfo *grouped_rel);
419 
420 
421 /*
422  * Foreign-data wrapper handler function: return a struct with pointers
423  * to my callback routines.
424  */
425 Datum
427 {
428  FdwRoutine *routine = makeNode(FdwRoutine);
429 
430  /* Functions for scanning foreign tables */
438 
439  /* Functions for updating foreign tables */
452 
453  /* Function for EvalPlanQual rechecks */
455  /* Support functions for EXPLAIN */
459 
460  /* Support functions for ANALYZE */
462 
463  /* Support functions for IMPORT FOREIGN SCHEMA */
465 
466  /* Support functions for join push-down */
468 
469  /* Support functions for upper relation push-down */
471 
472  PG_RETURN_POINTER(routine);
473 }
474 
475 /*
476  * postgresGetForeignRelSize
477  * Estimate # of rows and width of the result of the scan
478  *
479  * We should consider the effect of all baserestrictinfo clauses here, but
480  * not any join clauses.
481  */
482 static void
484  RelOptInfo *baserel,
485  Oid foreigntableid)
486 {
487  PgFdwRelationInfo *fpinfo;
488  ListCell *lc;
489  RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
490  const char *namespace;
491  const char *relname;
492  const char *refname;
493 
494  /*
495  * We use PgFdwRelationInfo to pass various information to subsequent
496  * functions.
497  */
498  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
499  baserel->fdw_private = (void *) fpinfo;
500 
501  /* Base foreign tables need to be pushed down always. */
502  fpinfo->pushdown_safe = true;
503 
504  /* Look up foreign-table catalog info. */
505  fpinfo->table = GetForeignTable(foreigntableid);
506  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
507 
508  /*
509  * Extract user-settable option values. Note that per-table setting of
510  * use_remote_estimate overrides per-server setting.
511  */
512  fpinfo->use_remote_estimate = false;
515  fpinfo->shippable_extensions = NIL;
516  fpinfo->fetch_size = 100;
517 
518  foreach(lc, fpinfo->server->options)
519  {
520  DefElem *def = (DefElem *) lfirst(lc);
521 
522  if (strcmp(def->defname, "use_remote_estimate") == 0)
523  fpinfo->use_remote_estimate = defGetBoolean(def);
524  else if (strcmp(def->defname, "fdw_startup_cost") == 0)
525  fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
526  else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
527  fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
528  else if (strcmp(def->defname, "extensions") == 0)
529  fpinfo->shippable_extensions =
530  ExtractExtensionList(defGetString(def), false);
531  else if (strcmp(def->defname, "fetch_size") == 0)
532  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
533  }
534  foreach(lc, fpinfo->table->options)
535  {
536  DefElem *def = (DefElem *) lfirst(lc);
537 
538  if (strcmp(def->defname, "use_remote_estimate") == 0)
539  fpinfo->use_remote_estimate = defGetBoolean(def);
540  else if (strcmp(def->defname, "fetch_size") == 0)
541  fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
542  }
543 
544  /*
545  * If the table or the server is configured to use remote estimates,
546  * identify which user to do remote access as during planning. This
547  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
548  * permissions, the query would have failed at runtime anyway.
549  */
550  if (fpinfo->use_remote_estimate)
551  {
552  Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
553 
554  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
555  }
556  else
557  fpinfo->user = NULL;
558 
559  /*
560  * Identify which baserestrictinfo clauses can be sent to the remote
561  * server and which can't.
562  */
563  classifyConditions(root, baserel, baserel->baserestrictinfo,
564  &fpinfo->remote_conds, &fpinfo->local_conds);
565 
566  /*
567  * Identify which attributes will need to be retrieved from the remote
568  * server. These include all attrs needed for joins or final output, plus
569  * all attrs used in the local_conds. (Note: if we end up using a
570  * parameterized scan, it's possible that some of the join clauses will be
571  * sent to the remote and thus we wouldn't really need to retrieve the
572  * columns used in them. Doesn't seem worth detecting that case though.)
573  */
574  fpinfo->attrs_used = NULL;
575  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
576  &fpinfo->attrs_used);
577  foreach(lc, fpinfo->local_conds)
578  {
579  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
580 
581  pull_varattnos((Node *) rinfo->clause, baserel->relid,
582  &fpinfo->attrs_used);
583  }
584 
585  /*
586  * Compute the selectivity and cost of the local_conds, so we don't have
587  * to do it over again for each path. The best we can do for these
588  * conditions is to estimate selectivity on the basis of local statistics.
589  */
591  fpinfo->local_conds,
592  baserel->relid,
593  JOIN_INNER,
594  NULL);
595 
596  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
597 
598  /*
599  * Set cached relation costs to some negative value, so that we can detect
600  * when they are set to some sensible costs during one (usually the first)
601  * of the calls to estimate_path_cost_size().
602  */
603  fpinfo->rel_startup_cost = -1;
604  fpinfo->rel_total_cost = -1;
605 
606  /*
607  * If the table or the server is configured to use remote estimates,
608  * connect to the foreign server and execute EXPLAIN to estimate the
609  * number of rows selected by the restriction clauses, as well as the
610  * average row width. Otherwise, estimate using whatever statistics we
611  * have locally, in a way similar to ordinary tables.
612  */
613  if (fpinfo->use_remote_estimate)
614  {
615  /*
616  * Get cost/size estimates with help of remote server. Save the
617  * values in fpinfo so we don't need to do it again to generate the
618  * basic foreign path.
619  */
620  estimate_path_cost_size(root, baserel, NIL, NIL,
621  &fpinfo->rows, &fpinfo->width,
622  &fpinfo->startup_cost, &fpinfo->total_cost);
623 
624  /* Report estimated baserel size to planner. */
625  baserel->rows = fpinfo->rows;
626  baserel->reltarget->width = fpinfo->width;
627  }
628  else
629  {
630  /*
631  * If the foreign table has never been ANALYZEd, it will have relpages
632  * and reltuples equal to zero, which most likely has nothing to do
633  * with reality. We can't do a whole lot about that if we're not
634  * allowed to consult the remote server, but we can use a hack similar
635  * to plancat.c's treatment of empty relations: use a minimum size
636  * estimate of 10 pages, and divide by the column-datatype-based width
637  * estimate to get the corresponding number of tuples.
638  */
639  if (baserel->pages == 0 && baserel->tuples == 0)
640  {
641  baserel->pages = 10;
642  baserel->tuples =
643  (10 * BLCKSZ) / (baserel->reltarget->width +
645  }
646 
647  /* Estimate baserel size as best we can with local statistics. */
648  set_baserel_size_estimates(root, baserel);
649 
650  /* Fill in basically-bogus cost estimates for use later. */
651  estimate_path_cost_size(root, baserel, NIL, NIL,
652  &fpinfo->rows, &fpinfo->width,
653  &fpinfo->startup_cost, &fpinfo->total_cost);
654  }
655 
656  /*
657  * Set the name of relation in fpinfo, while we are constructing it here.
658  * It will be used to build the string describing the join relation in
659  * EXPLAIN output. We can't know whether VERBOSE option is specified or
660  * not, so always schema-qualify the foreign table name.
661  */
662  fpinfo->relation_name = makeStringInfo();
663  namespace = get_namespace_name(get_rel_namespace(foreigntableid));
664  relname = get_rel_name(foreigntableid);
665  refname = rte->eref->aliasname;
666  appendStringInfo(fpinfo->relation_name, "%s.%s",
667  quote_identifier(namespace),
668  quote_identifier(relname));
669  if (*refname && strcmp(refname, relname) != 0)
670  appendStringInfo(fpinfo->relation_name, " %s",
672 
673  /* No outer and inner relations. */
674  fpinfo->make_outerrel_subquery = false;
675  fpinfo->make_innerrel_subquery = false;
676  fpinfo->lower_subquery_rels = NULL;
677  /* Set the relation index. */
678  fpinfo->relation_index = baserel->relid;
679 }
680 
681 /*
682  * get_useful_ecs_for_relation
683  * Determine which EquivalenceClasses might be involved in useful
684  * orderings of this relation.
685  *
686  * This function is in some respects a mirror image of the core function
687  * pathkeys_useful_for_merging: for a regular table, we know what indexes
688  * we have and want to test whether any of them are useful. For a foreign
689  * table, we don't know what indexes are present on the remote side but
690  * want to speculate about which ones we'd like to use if they existed.
691  *
692  * This function returns a list of potentially-useful equivalence classes,
693  * but it does not guarantee that an EquivalenceMember exists which contains
694  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
695  * ft1.x + t1.x = 0, this function will say that the equivalence class
696  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
697  * t1 is local (or on a different server), it will turn out that no useful
698  * ORDER BY clause can be generated. It's not our job to figure that out
699  * here; we're only interested in identifying relevant ECs.
700  */
701 static List *
703 {
704  List *useful_eclass_list = NIL;
705  ListCell *lc;
706  Relids relids;
707 
708  /*
709  * First, consider whether any active EC is potentially useful for a merge
710  * join against this relation.
711  */
712  if (rel->has_eclass_joins)
713  {
714  foreach(lc, root->eq_classes)
715  {
716  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
717 
718  if (eclass_useful_for_merging(root, cur_ec, rel))
719  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
720  }
721  }
722 
723  /*
724  * Next, consider whether there are any non-EC derivable join clauses that
725  * are merge-joinable. If the joininfo list is empty, we can exit
726  * quickly.
727  */
728  if (rel->joininfo == NIL)
729  return useful_eclass_list;
730 
731  /* If this is a child rel, we must use the topmost parent rel to search. */
733  relids = find_childrel_top_parent(root, rel)->relids;
734  else
735  relids = rel->relids;
736 
737  /* Check each join clause in turn. */
738  foreach(lc, rel->joininfo)
739  {
740  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
741 
742  /* Consider only mergejoinable clauses */
743  if (restrictinfo->mergeopfamilies == NIL)
744  continue;
745 
746  /* Make sure we've got canonical ECs. */
747  update_mergeclause_eclasses(root, restrictinfo);
748 
749  /*
750  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
751  * that left_ec and right_ec will be initialized, per comments in
752  * distribute_qual_to_rels.
753  *
754  * We want to identify which side of this merge-joinable clause
755  * contains columns from the relation produced by this RelOptInfo. We
756  * test for overlap, not containment, because there could be extra
757  * relations on either side. For example, suppose we've got something
758  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
759  * A.y = D.y. The input rel might be the joinrel between A and B, and
760  * we'll consider the join clause A.y = D.y. relids contains a
761  * relation not involved in the join class (B) and the equivalence
762  * class for the left-hand side of the clause contains a relation not
763  * involved in the input rel (C). Despite the fact that we have only
764  * overlap and not containment in either direction, A.y is potentially
765  * useful as a sort column.
766  *
767  * Note that it's even possible that relids overlaps neither side of
768  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
769  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
770  * but overlaps neither side of B. In that case, we just skip this
771  * join clause, since it doesn't suggest a useful sort order for this
772  * relation.
773  */
774  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
775  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
776  restrictinfo->right_ec);
777  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
778  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
779  restrictinfo->left_ec);
780  }
781 
782  return useful_eclass_list;
783 }
784 
785 /*
786  * get_useful_pathkeys_for_relation
787  * Determine which orderings of a relation might be useful.
788  *
789  * Getting data in sorted order can be useful either because the requested
790  * order matches the final output ordering for the overall query we're
791  * planning, or because it enables an efficient merge join. Here, we try
792  * to figure out which pathkeys to consider.
793  */
794 static List *
796 {
797  List *useful_pathkeys_list = NIL;
798  List *useful_eclass_list;
800  EquivalenceClass *query_ec = NULL;
801  ListCell *lc;
802 
803  /*
804  * Pushing the query_pathkeys to the remote server is always worth
805  * considering, because it might let us avoid a local sort.
806  */
807  if (root->query_pathkeys)
808  {
809  bool query_pathkeys_ok = true;
810 
811  foreach(lc, root->query_pathkeys)
812  {
813  PathKey *pathkey = (PathKey *) lfirst(lc);
814  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
815  Expr *em_expr;
816 
817  /*
818  * The planner and executor don't have any clever strategy for
819  * taking data sorted by a prefix of the query's pathkeys and
820  * getting it to be sorted by all of those pathkeys. We'll just
821  * end up resorting the entire data set. So, unless we can push
822  * down all of the query pathkeys, forget it.
823  *
824  * is_foreign_expr would detect volatile expressions as well, but
825  * checking ec_has_volatile here saves some cycles.
826  */
827  if (pathkey_ec->ec_has_volatile ||
828  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
829  !is_foreign_expr(root, rel, em_expr))
830  {
831  query_pathkeys_ok = false;
832  break;
833  }
834  }
835 
836  if (query_pathkeys_ok)
837  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
838  }
839 
840  /*
841  * Even if we're not using remote estimates, having the remote side do the
842  * sort generally won't be any worse than doing it locally, and it might
843  * be much better if the remote side can generate data in the right order
844  * without needing a sort at all. However, what we're going to do next is
845  * try to generate pathkeys that seem promising for possible merge joins,
846  * and that's more speculative. A wrong choice might hurt quite a bit, so
847  * bail out if we can't use remote estimates.
848  */
849  if (!fpinfo->use_remote_estimate)
850  return useful_pathkeys_list;
851 
852  /* Get the list of interesting EquivalenceClasses. */
853  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
854 
855  /* Extract unique EC for query, if any, so we don't consider it again. */
856  if (list_length(root->query_pathkeys) == 1)
857  {
858  PathKey *query_pathkey = linitial(root->query_pathkeys);
859 
860  query_ec = query_pathkey->pk_eclass;
861  }
862 
863  /*
864  * As a heuristic, the only pathkeys we consider here are those of length
865  * one. It's surely possible to consider more, but since each one we
866  * choose to consider will generate a round-trip to the remote side, we
867  * need to be a bit cautious here. It would sure be nice to have a local
868  * cache of information about remote index definitions...
869  */
870  foreach(lc, useful_eclass_list)
871  {
872  EquivalenceClass *cur_ec = lfirst(lc);
873  Expr *em_expr;
874  PathKey *pathkey;
875 
876  /* If redundant with what we did above, skip it. */
877  if (cur_ec == query_ec)
878  continue;
879 
880  /* If no pushable expression for this rel, skip it. */
881  em_expr = find_em_expr_for_rel(cur_ec, rel);
882  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
883  continue;
884 
885  /* Looks like we can generate a pathkey, so let's do it. */
886  pathkey = make_canonical_pathkey(root, cur_ec,
887  linitial_oid(cur_ec->ec_opfamilies),
889  false);
890  useful_pathkeys_list = lappend(useful_pathkeys_list,
891  list_make1(pathkey));
892  }
893 
894  return useful_pathkeys_list;
895 }
896 
897 /*
898  * postgresGetForeignPaths
899  * Create possible scan paths for a scan on the foreign table
900  */
901 static void
903  RelOptInfo *baserel,
904  Oid foreigntableid)
905 {
906  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
907  ForeignPath *path;
908  List *ppi_list;
909  ListCell *lc;
910 
911  /*
912  * Create simplest ForeignScan path node and add it to baserel. This path
913  * corresponds to SeqScan path of regular tables (though depending on what
914  * baserestrict conditions we were able to send to remote, there might
915  * actually be an indexscan happening there). We already did all the work
916  * to estimate cost and size of this path.
917  */
918  path = create_foreignscan_path(root, baserel,
919  NULL, /* default pathtarget */
920  fpinfo->rows,
921  fpinfo->startup_cost,
922  fpinfo->total_cost,
923  NIL, /* no pathkeys */
924  NULL, /* no outer rel either */
925  NULL, /* no extra plan */
926  NIL); /* no fdw_private list */
927  add_path(baserel, (Path *) path);
928 
929  /* Add paths with pathkeys */
930  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
931 
932  /*
933  * If we're not using remote estimates, stop here. We have no way to
934  * estimate whether any join clauses would be worth sending across, so
935  * don't bother building parameterized paths.
936  */
937  if (!fpinfo->use_remote_estimate)
938  return;
939 
940  /*
941  * Thumb through all join clauses for the rel to identify which outer
942  * relations could supply one or more safe-to-send-to-remote join clauses.
943  * We'll build a parameterized path for each such outer relation.
944  *
945  * It's convenient to manage this by representing each candidate outer
946  * relation by the ParamPathInfo node for it. We can then use the
947  * ppi_clauses list in the ParamPathInfo node directly as a list of the
948  * interesting join clauses for that rel. This takes care of the
949  * possibility that there are multiple safe join clauses for such a rel,
950  * and also ensures that we account for unsafe join clauses that we'll
951  * still have to enforce locally (since the parameterized-path machinery
952  * insists that we handle all movable clauses).
953  */
954  ppi_list = NIL;
955  foreach(lc, baserel->joininfo)
956  {
957  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
958  Relids required_outer;
959  ParamPathInfo *param_info;
960 
961  /* Check if clause can be moved to this rel */
962  if (!join_clause_is_movable_to(rinfo, baserel))
963  continue;
964 
965  /* See if it is safe to send to remote */
966  if (!is_foreign_expr(root, baserel, rinfo->clause))
967  continue;
968 
969  /* Calculate required outer rels for the resulting path */
970  required_outer = bms_union(rinfo->clause_relids,
971  baserel->lateral_relids);
972  /* We do not want the foreign rel itself listed in required_outer */
973  required_outer = bms_del_member(required_outer, baserel->relid);
974 
975  /*
976  * required_outer probably can't be empty here, but if it were, we
977  * couldn't make a parameterized path.
978  */
979  if (bms_is_empty(required_outer))
980  continue;
981 
982  /* Get the ParamPathInfo */
983  param_info = get_baserel_parampathinfo(root, baserel,
984  required_outer);
985  Assert(param_info != NULL);
986 
987  /*
988  * Add it to list unless we already have it. Testing pointer equality
989  * is OK since get_baserel_parampathinfo won't make duplicates.
990  */
991  ppi_list = list_append_unique_ptr(ppi_list, param_info);
992  }
993 
994  /*
995  * The above scan examined only "generic" join clauses, not those that
996  * were absorbed into EquivalenceClauses. See if we can make anything out
997  * of EquivalenceClauses.
998  */
999  if (baserel->has_eclass_joins)
1000  {
1001  /*
1002  * We repeatedly scan the eclass list looking for column references
1003  * (or expressions) belonging to the foreign rel. Each time we find
1004  * one, we generate a list of equivalence joinclauses for it, and then
1005  * see if any are safe to send to the remote. Repeat till there are
1006  * no more candidate EC members.
1007  */
1009 
1010  arg.already_used = NIL;
1011  for (;;)
1012  {
1013  List *clauses;
1014 
1015  /* Make clauses, skipping any that join to lateral_referencers */
1016  arg.current = NULL;
1018  baserel,
1020  (void *) &arg,
1021  baserel->lateral_referencers);
1022 
1023  /* Done if there are no more expressions in the foreign rel */
1024  if (arg.current == NULL)
1025  {
1026  Assert(clauses == NIL);
1027  break;
1028  }
1029 
1030  /* Scan the extracted join clauses */
1031  foreach(lc, clauses)
1032  {
1033  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1034  Relids required_outer;
1035  ParamPathInfo *param_info;
1036 
1037  /* Check if clause can be moved to this rel */
1038  if (!join_clause_is_movable_to(rinfo, baserel))
1039  continue;
1040 
1041  /* See if it is safe to send to remote */
1042  if (!is_foreign_expr(root, baserel, rinfo->clause))
1043  continue;
1044 
1045  /* Calculate required outer rels for the resulting path */
1046  required_outer = bms_union(rinfo->clause_relids,
1047  baserel->lateral_relids);
1048  required_outer = bms_del_member(required_outer, baserel->relid);
1049  if (bms_is_empty(required_outer))
1050  continue;
1051 
1052  /* Get the ParamPathInfo */
1053  param_info = get_baserel_parampathinfo(root, baserel,
1054  required_outer);
1055  Assert(param_info != NULL);
1056 
1057  /* Add it to list unless we already have it */
1058  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1059  }
1060 
1061  /* Try again, now ignoring the expression we found this time */
1062  arg.already_used = lappend(arg.already_used, arg.current);
1063  }
1064  }
1065 
1066  /*
1067  * Now build a path for each useful outer relation.
1068  */
1069  foreach(lc, ppi_list)
1070  {
1071  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1072  double rows;
1073  int width;
1074  Cost startup_cost;
1075  Cost total_cost;
1076 
1077  /* Get a cost estimate from the remote */
1078  estimate_path_cost_size(root, baserel,
1079  param_info->ppi_clauses, NIL,
1080  &rows, &width,
1081  &startup_cost, &total_cost);
1082 
1083  /*
1084  * ppi_rows currently won't get looked at by anything, but still we
1085  * may as well ensure that it matches our idea of the rowcount.
1086  */
1087  param_info->ppi_rows = rows;
1088 
1089  /* Make the path */
1090  path = create_foreignscan_path(root, baserel,
1091  NULL, /* default pathtarget */
1092  rows,
1093  startup_cost,
1094  total_cost,
1095  NIL, /* no pathkeys */
1096  param_info->ppi_req_outer,
1097  NULL,
1098  NIL); /* no fdw_private list */
1099  add_path(baserel, (Path *) path);
1100  }
1101 }
1102 
1103 /*
1104  * postgresGetForeignPlan
1105  * Create ForeignScan plan node which implements selected best path
1106  */
1107 static ForeignScan *
1109  RelOptInfo *foreignrel,
1110  Oid foreigntableid,
1111  ForeignPath *best_path,
1112  List *tlist,
1113  List *scan_clauses,
1114  Plan *outer_plan)
1115 {
1116  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1117  Index scan_relid;
1118  List *fdw_private;
1119  List *remote_conds = NIL;
1120  List *remote_exprs = NIL;
1121  List *local_exprs = NIL;
1122  List *params_list = NIL;
1123  List *retrieved_attrs;
1124  StringInfoData sql;
1125  ListCell *lc;
1126  List *fdw_scan_tlist = NIL;
1127 
1128  /*
1129  * For base relations, set scan_relid as the relid of the relation. For
1130  * other kinds of relations set it to 0.
1131  */
1132  if (foreignrel->reloptkind == RELOPT_BASEREL ||
1133  foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
1134  scan_relid = foreignrel->relid;
1135  else
1136  {
1137  scan_relid = 0;
1138 
1139  /*
1140  * create_scan_plan() and create_foreignscan_plan() pass
1141  * rel->baserestrictinfo + parameterization clauses through
1142  * scan_clauses. For a join rel->baserestrictinfo is NIL and we are
1143  * not considering parameterization right now, so there should be no
1144  * scan_clauses for a joinrel and upper rel either.
1145  */
1146  Assert(!scan_clauses);
1147  }
1148 
1149  /*
1150  * Separate the scan_clauses into those that can be executed remotely and
1151  * those that can't. baserestrictinfo clauses that were previously
1152  * determined to be safe or unsafe by classifyConditions are shown in
1153  * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
1154  * scan_clauses list will be a join clause, which we have to check for
1155  * remote-safety.
1156  *
1157  * Note: the join clauses we see here should be the exact same ones
1158  * previously examined by postgresGetForeignPaths. Possibly it'd be worth
1159  * passing forward the classification work done then, rather than
1160  * repeating it here.
1161  *
1162  * This code must match "extract_actual_clauses(scan_clauses, false)"
1163  * except for the additional decision about remote versus local execution.
1164  * Note however that we don't strip the RestrictInfo nodes from the
1165  * remote_conds list, since appendWhereClause expects a list of
1166  * RestrictInfos.
1167  */
1168  foreach(lc, scan_clauses)
1169  {
1170  RestrictInfo *rinfo = castNode(RestrictInfo, lfirst(lc));
1171 
1172  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1173  if (rinfo->pseudoconstant)
1174  continue;
1175 
1176  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1177  {
1178  remote_conds = lappend(remote_conds, rinfo);
1179  remote_exprs = lappend(remote_exprs, rinfo->clause);
1180  }
1181  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1182  local_exprs = lappend(local_exprs, rinfo->clause);
1183  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1184  {
1185  remote_conds = lappend(remote_conds, rinfo);
1186  remote_exprs = lappend(remote_exprs, rinfo->clause);
1187  }
1188  else
1189  local_exprs = lappend(local_exprs, rinfo->clause);
1190  }
1191 
1192  if (foreignrel->reloptkind == RELOPT_JOINREL ||
1193  foreignrel->reloptkind == RELOPT_UPPER_REL)
1194  {
1195  /* For a join relation, get the conditions from fdw_private structure */
1196  remote_conds = fpinfo->remote_conds;
1197  local_exprs = fpinfo->local_conds;
1198 
1199  /* Build the list of columns to be fetched from the foreign server. */
1200  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1201 
1202  /*
1203  * Ensure that the outer plan produces a tuple whose descriptor
1204  * matches our scan tuple slot. This is safe because all scans and
1205  * joins support projection, so we never need to insert a Result node.
1206  * Also, remove the local conditions from outer plan's quals, lest
1207  * they will be evaluated twice, once by the local plan and once by
1208  * the scan.
1209  */
1210  if (outer_plan)
1211  {
1212  ListCell *lc;
1213 
1214  /*
1215  * Right now, we only consider grouping and aggregation beyond
1216  * joins. Queries involving aggregates or grouping do not require
1217  * EPQ mechanism, hence should not have an outer plan here.
1218  */
1219  Assert(foreignrel->reloptkind != RELOPT_UPPER_REL);
1220 
1221  outer_plan->targetlist = fdw_scan_tlist;
1222 
1223  foreach(lc, local_exprs)
1224  {
1225  Join *join_plan = (Join *) outer_plan;
1226  Node *qual = lfirst(lc);
1227 
1228  outer_plan->qual = list_delete(outer_plan->qual, qual);
1229 
1230  /*
1231  * For an inner join the local conditions of foreign scan plan
1232  * can be part of the joinquals as well.
1233  */
1234  if (join_plan->jointype == JOIN_INNER)
1235  join_plan->joinqual = list_delete(join_plan->joinqual,
1236  qual);
1237  }
1238  }
1239  }
1240 
1241  /*
1242  * Build the query string to be sent for execution, and identify
1243  * expressions to be sent as parameters.
1244  */
1245  initStringInfo(&sql);
1246  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1247  remote_conds, best_path->path.pathkeys,
1248  false, &retrieved_attrs, &params_list);
1249 
1250  /*
1251  * Build the fdw_private list that will be available to the executor.
1252  * Items in the list must match order in enum FdwScanPrivateIndex.
1253  */
1254  fdw_private = list_make4(makeString(sql.data),
1255  remote_conds,
1256  retrieved_attrs,
1257  makeInteger(fpinfo->fetch_size));
1258  if (foreignrel->reloptkind == RELOPT_JOINREL ||
1259  foreignrel->reloptkind == RELOPT_UPPER_REL)
1260  fdw_private = lappend(fdw_private,
1261  makeString(fpinfo->relation_name->data));
1262 
1263  /*
1264  * Create the ForeignScan node for the given relation.
1265  *
1266  * Note that the remote parameter expressions are stored in the fdw_exprs
1267  * field of the finished plan node; we can't keep them in private state
1268  * because then they wouldn't be subject to later planner processing.
1269  */
1270  return make_foreignscan(tlist,
1271  local_exprs,
1272  scan_relid,
1273  params_list,
1274  fdw_private,
1275  fdw_scan_tlist,
1276  remote_exprs,
1277  outer_plan);
1278 }
1279 
1280 /*
1281  * postgresBeginForeignScan
1282  * Initiate an executor scan of a foreign PostgreSQL table.
1283  */
1284 static void
1286 {
1287  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1288  EState *estate = node->ss.ps.state;
1289  PgFdwScanState *fsstate;
1290  RangeTblEntry *rte;
1291  Oid userid;
1292  ForeignTable *table;
1293  UserMapping *user;
1294  int rtindex;
1295  int numParams;
1296 
1297  /*
1298  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1299  */
1300  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1301  return;
1302 
1303  /*
1304  * We'll save private state in node->fdw_state.
1305  */
1306  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1307  node->fdw_state = (void *) fsstate;
1308 
1309  /*
1310  * Identify which user to do the remote access as. This should match what
1311  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1312  * lowest-numbered member RTE as a representative; we would get the same
1313  * result from any.
1314  */
1315  if (fsplan->scan.scanrelid > 0)
1316  rtindex = fsplan->scan.scanrelid;
1317  else
1318  rtindex = bms_next_member(fsplan->fs_relids, -1);
1319  rte = rt_fetch(rtindex, estate->es_range_table);
1320  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1321 
1322  /* Get info about foreign table. */
1323  table = GetForeignTable(rte->relid);
1324  user = GetUserMapping(userid, table->serverid);
1325 
1326  /*
1327  * Get connection to the foreign server. Connection manager will
1328  * establish new connection if necessary.
1329  */
1330  fsstate->conn = GetConnection(user, false);
1331 
1332  /* Assign a unique ID for my cursor */
1333  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1334  fsstate->cursor_exists = false;
1335 
1336  /* Get private info created by planner functions. */
1337  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1339  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1341  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1343 
1344  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1345  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1346  "postgres_fdw tuple data",
1348  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1349  "postgres_fdw temporary data",
1351 
1352  /*
1353  * Get info we'll need for converting data fetched from the foreign server
1354  * into local representation and error reporting during that process.
1355  */
1356  if (fsplan->scan.scanrelid > 0)
1357  {
1358  fsstate->rel = node->ss.ss_currentRelation;
1359  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1360  }
1361  else
1362  {
1363  fsstate->rel = NULL;
1364  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1365  }
1366 
1367  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1368 
1369  /*
1370  * Prepare for processing of parameters used in remote query, if any.
1371  */
1372  numParams = list_length(fsplan->fdw_exprs);
1373  fsstate->numParams = numParams;
1374  if (numParams > 0)
1376  fsplan->fdw_exprs,
1377  numParams,
1378  &fsstate->param_flinfo,
1379  &fsstate->param_exprs,
1380  &fsstate->param_values);
1381 }
1382 
1383 /*
1384  * postgresIterateForeignScan
1385  * Retrieve next row from the result set, or clear tuple slot to indicate
1386  * EOF.
1387  */
1388 static TupleTableSlot *
1390 {
1391  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1392  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1393 
1394  /*
1395  * If this is the first call after Begin or ReScan, we need to create the
1396  * cursor on the remote side.
1397  */
1398  if (!fsstate->cursor_exists)
1399  create_cursor(node);
1400 
1401  /*
1402  * Get some more tuples, if we've run out.
1403  */
1404  if (fsstate->next_tuple >= fsstate->num_tuples)
1405  {
1406  /* No point in another fetch if we already detected EOF, though. */
1407  if (!fsstate->eof_reached)
1408  fetch_more_data(node);
1409  /* If we didn't get any tuples, must be end of data. */
1410  if (fsstate->next_tuple >= fsstate->num_tuples)
1411  return ExecClearTuple(slot);
1412  }
1413 
1414  /*
1415  * Return the next tuple.
1416  */
1417  ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
1418  slot,
1419  InvalidBuffer,
1420  false);
1421 
1422  return slot;
1423 }
1424 
1425 /*
1426  * postgresReScanForeignScan
1427  * Restart the scan.
1428  */
1429 static void
1431 {
1432  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1433  char sql[64];
1434  PGresult *res;
1435 
1436  /* If we haven't created the cursor yet, nothing to do. */
1437  if (!fsstate->cursor_exists)
1438  return;
1439 
1440  /*
1441  * If any internal parameters affecting this node have changed, we'd
1442  * better destroy and recreate the cursor. Otherwise, rewinding it should
1443  * be good enough. If we've only fetched zero or one batch, we needn't
1444  * even rewind the cursor, just rescan what we have.
1445  */
1446  if (node->ss.ps.chgParam != NULL)
1447  {
1448  fsstate->cursor_exists = false;
1449  snprintf(sql, sizeof(sql), "CLOSE c%u",
1450  fsstate->cursor_number);
1451  }
1452  else if (fsstate->fetch_ct_2 > 1)
1453  {
1454  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1455  fsstate->cursor_number);
1456  }
1457  else
1458  {
1459  /* Easy: just rescan what we already have in memory, if anything */
1460  fsstate->next_tuple = 0;
1461  return;
1462  }
1463 
1464  /*
1465  * We don't use a PG_TRY block here, so be careful not to throw error
1466  * without releasing the PGresult.
1467  */
1468  res = pgfdw_exec_query(fsstate->conn, sql);
1469  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1470  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1471  PQclear(res);
1472 
1473  /* Now force a fresh FETCH. */
1474  fsstate->tuples = NULL;
1475  fsstate->num_tuples = 0;
1476  fsstate->next_tuple = 0;
1477  fsstate->fetch_ct_2 = 0;
1478  fsstate->eof_reached = false;
1479 }
1480 
1481 /*
1482  * postgresEndForeignScan
1483  * Finish scanning foreign table and dispose objects used for this scan
1484  */
1485 static void
1487 {
1488  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1489 
1490  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1491  if (fsstate == NULL)
1492  return;
1493 
1494  /* Close the cursor if open, to prevent accumulation of cursors */
1495  if (fsstate->cursor_exists)
1496  close_cursor(fsstate->conn, fsstate->cursor_number);
1497 
1498  /* Release remote connection */
1499  ReleaseConnection(fsstate->conn);
1500  fsstate->conn = NULL;
1501 
1502  /* MemoryContexts will be deleted automatically. */
1503 }
1504 
1505 /*
1506  * postgresAddForeignUpdateTargets
1507  * Add resjunk column(s) needed for update/delete on a foreign table
1508  */
1509 static void
1511  RangeTblEntry *target_rte,
1512  Relation target_relation)
1513 {
1514  Var *var;
1515  const char *attrname;
1516  TargetEntry *tle;
1517 
1518  /*
1519  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1520  */
1521 
1522  /* Make a Var representing the desired value */
1523  var = makeVar(parsetree->resultRelation,
1525  TIDOID,
1526  -1,
1527  InvalidOid,
1528  0);
1529 
1530  /* Wrap it in a resjunk TLE with the right name ... */
1531  attrname = "ctid";
1532 
1533  tle = makeTargetEntry((Expr *) var,
1534  list_length(parsetree->targetList) + 1,
1535  pstrdup(attrname),
1536  true);
1537 
1538  /* ... and add it to the query's targetlist */
1539  parsetree->targetList = lappend(parsetree->targetList, tle);
1540 }
1541 
1542 /*
1543  * postgresPlanForeignModify
1544  * Plan an insert/update/delete operation on a foreign table
1545  */
1546 static List *
1548  ModifyTable *plan,
1549  Index resultRelation,
1550  int subplan_index)
1551 {
1552  CmdType operation = plan->operation;
1553  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1554  Relation rel;
1555  StringInfoData sql;
1556  List *targetAttrs = NIL;
1557  List *returningList = NIL;
1558  List *retrieved_attrs = NIL;
1559  bool doNothing = false;
1560 
1561  initStringInfo(&sql);
1562 
1563  /*
1564  * Core code already has some lock on each rel being planned, so we can
1565  * use NoLock here.
1566  */
1567  rel = heap_open(rte->relid, NoLock);
1568 
1569  /*
1570  * In an INSERT, we transmit all columns that are defined in the foreign
1571  * table. In an UPDATE, we transmit only columns that were explicitly
1572  * targets of the UPDATE, so as to avoid unnecessary data transmission.
1573  * (We can't do that for INSERT since we would miss sending default values
1574  * for columns not listed in the source statement.)
1575  */
1576  if (operation == CMD_INSERT)
1577  {
1578  TupleDesc tupdesc = RelationGetDescr(rel);
1579  int attnum;
1580 
1581  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1582  {
1583  Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
1584 
1585  if (!attr->attisdropped)
1586  targetAttrs = lappend_int(targetAttrs, attnum);
1587  }
1588  }
1589  else if (operation == CMD_UPDATE)
1590  {
1591  int col;
1592 
1593  col = -1;
1594  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
1595  {
1596  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1598 
1599  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1600  elog(ERROR, "system-column update is not supported");
1601  targetAttrs = lappend_int(targetAttrs, attno);
1602  }
1603  }
1604 
1605  /*
1606  * Extract the relevant RETURNING list if any.
1607  */
1608  if (plan->returningLists)
1609  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1610 
1611  /*
1612  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1613  * should have already been rejected in the optimizer, as presently there
1614  * is no way to recognize an arbiter index on a foreign table. Only DO
1615  * NOTHING is supported without an inference specification.
1616  */
1617  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1618  doNothing = true;
1619  else if (plan->onConflictAction != ONCONFLICT_NONE)
1620  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1621  (int) plan->onConflictAction);
1622 
1623  /*
1624  * Construct the SQL command string.
1625  */
1626  switch (operation)
1627  {
1628  case CMD_INSERT:
1629  deparseInsertSql(&sql, root, resultRelation, rel,
1630  targetAttrs, doNothing, returningList,
1631  &retrieved_attrs);
1632  break;
1633  case CMD_UPDATE:
1634  deparseUpdateSql(&sql, root, resultRelation, rel,
1635  targetAttrs, returningList,
1636  &retrieved_attrs);
1637  break;
1638  case CMD_DELETE:
1639  deparseDeleteSql(&sql, root, resultRelation, rel,
1640  returningList,
1641  &retrieved_attrs);
1642  break;
1643  default:
1644  elog(ERROR, "unexpected operation: %d", (int) operation);
1645  break;
1646  }
1647 
1648  heap_close(rel, NoLock);
1649 
1650  /*
1651  * Build the fdw_private list that will be available to the executor.
1652  * Items in the list must match enum FdwModifyPrivateIndex, above.
1653  */
1654  return list_make4(makeString(sql.data),
1655  targetAttrs,
1656  makeInteger((retrieved_attrs != NIL)),
1657  retrieved_attrs);
1658 }
1659 
1660 /*
1661  * postgresBeginForeignModify
1662  * Begin an insert/update/delete operation on a foreign table
1663  */
1664 static void
1666  ResultRelInfo *resultRelInfo,
1667  List *fdw_private,
1668  int subplan_index,
1669  int eflags)
1670 {
1671  PgFdwModifyState *fmstate;
1672  EState *estate = mtstate->ps.state;
1673  CmdType operation = mtstate->operation;
1674  Relation rel = resultRelInfo->ri_RelationDesc;
1675  RangeTblEntry *rte;
1676  Oid userid;
1677  ForeignTable *table;
1678  UserMapping *user;
1679  AttrNumber n_params;
1680  Oid typefnoid;
1681  bool isvarlena;
1682  ListCell *lc;
1683 
1684  /*
1685  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1686  * stays NULL.
1687  */
1688  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1689  return;
1690 
1691  /* Begin constructing PgFdwModifyState. */
1692  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1693  fmstate->rel = rel;
1694 
1695  /*
1696  * Identify which user to do the remote access as. This should match what
1697  * ExecCheckRTEPerms() does.
1698  */
1699  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1700  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1701 
1702  /* Get info about foreign table. */
1703  table = GetForeignTable(RelationGetRelid(rel));
1704  user = GetUserMapping(userid, table->serverid);
1705 
1706  /* Open connection; report that we'll create a prepared statement. */
1707  fmstate->conn = GetConnection(user, true);
1708  fmstate->p_name = NULL; /* prepared statement not made yet */
1709 
1710  /* Deconstruct fdw_private data. */
1711  fmstate->query = strVal(list_nth(fdw_private,
1713  fmstate->target_attrs = (List *) list_nth(fdw_private,
1715  fmstate->has_returning = intVal(list_nth(fdw_private,
1717  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1719 
1720  /* Create context for per-tuple temp workspace. */
1721  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1722  "postgres_fdw temporary data",
1724 
1725  /* Prepare for input conversion of RETURNING results. */
1726  if (fmstate->has_returning)
1728 
1729  /* Prepare for output conversion of parameters used in prepared stmt. */
1730  n_params = list_length(fmstate->target_attrs) + 1;
1731  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1732  fmstate->p_nums = 0;
1733 
1734  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1735  {
1736  /* Find the ctid resjunk column in the subplan's result */
1737  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1738 
1740  "ctid");
1741  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1742  elog(ERROR, "could not find junk ctid column");
1743 
1744  /* First transmittable parameter will be ctid */
1745  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1746  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1747  fmstate->p_nums++;
1748  }
1749 
1750  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1751  {
1752  /* Set up for remaining transmittable parameters */
1753  foreach(lc, fmstate->target_attrs)
1754  {
1755  int attnum = lfirst_int(lc);
1756  Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1757 
1758  Assert(!attr->attisdropped);
1759 
1760  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1761  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1762  fmstate->p_nums++;
1763  }
1764  }
1765 
1766  Assert(fmstate->p_nums <= n_params);
1767 
1768  resultRelInfo->ri_FdwState = fmstate;
1769 }
1770 
1771 /*
1772  * postgresExecForeignInsert
1773  * Insert one row into a foreign table
1774  */
1775 static TupleTableSlot *
1777  ResultRelInfo *resultRelInfo,
1778  TupleTableSlot *slot,
1779  TupleTableSlot *planSlot)
1780 {
1781  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1782  const char **p_values;
1783  PGresult *res;
1784  int n_rows;
1785 
1786  /* Set up the prepared statement on the remote server, if we didn't yet */
1787  if (!fmstate->p_name)
1788  prepare_foreign_modify(fmstate);
1789 
1790  /* Convert parameters needed by prepared statement to text form */
1791  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1792 
1793  /*
1794  * Execute the prepared statement.
1795  */
1796  if (!PQsendQueryPrepared(fmstate->conn,
1797  fmstate->p_name,
1798  fmstate->p_nums,
1799  p_values,
1800  NULL,
1801  NULL,
1802  0))
1803  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1804 
1805  /*
1806  * Get the result, and check for success.
1807  *
1808  * We don't use a PG_TRY block here, so be careful not to throw error
1809  * without releasing the PGresult.
1810  */
1811  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1812  if (PQresultStatus(res) !=
1814  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1815 
1816  /* Check number of rows affected, and fetch RETURNING tuple if any */
1817  if (fmstate->has_returning)
1818  {
1819  n_rows = PQntuples(res);
1820  if (n_rows > 0)
1821  store_returning_result(fmstate, slot, res);
1822  }
1823  else
1824  n_rows = atoi(PQcmdTuples(res));
1825 
1826  /* And clean up */
1827  PQclear(res);
1828 
1829  MemoryContextReset(fmstate->temp_cxt);
1830 
1831  /* Return NULL if nothing was inserted on the remote end */
1832  return (n_rows > 0) ? slot : NULL;
1833 }
1834 
1835 /*
1836  * postgresExecForeignUpdate
1837  * Update one row in a foreign table
1838  */
1839 static TupleTableSlot *
1841  ResultRelInfo *resultRelInfo,
1842  TupleTableSlot *slot,
1843  TupleTableSlot *planSlot)
1844 {
1845  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1846  Datum datum;
1847  bool isNull;
1848  const char **p_values;
1849  PGresult *res;
1850  int n_rows;
1851 
1852  /* Set up the prepared statement on the remote server, if we didn't yet */
1853  if (!fmstate->p_name)
1854  prepare_foreign_modify(fmstate);
1855 
1856  /* Get the ctid that was passed up as a resjunk column */
1857  datum = ExecGetJunkAttribute(planSlot,
1858  fmstate->ctidAttno,
1859  &isNull);
1860  /* shouldn't ever get a null result... */
1861  if (isNull)
1862  elog(ERROR, "ctid is NULL");
1863 
1864  /* Convert parameters needed by prepared statement to text form */
1865  p_values = convert_prep_stmt_params(fmstate,
1866  (ItemPointer) DatumGetPointer(datum),
1867  slot);
1868 
1869  /*
1870  * Execute the prepared statement.
1871  */
1872  if (!PQsendQueryPrepared(fmstate->conn,
1873  fmstate->p_name,
1874  fmstate->p_nums,
1875  p_values,
1876  NULL,
1877  NULL,
1878  0))
1879  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1880 
1881  /*
1882  * Get the result, and check for success.
1883  *
1884  * We don't use a PG_TRY block here, so be careful not to throw error
1885  * without releasing the PGresult.
1886  */
1887  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1888  if (PQresultStatus(res) !=
1890  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1891 
1892  /* Check number of rows affected, and fetch RETURNING tuple if any */
1893  if (fmstate->has_returning)
1894  {
1895  n_rows = PQntuples(res);
1896  if (n_rows > 0)
1897  store_returning_result(fmstate, slot, res);
1898  }
1899  else
1900  n_rows = atoi(PQcmdTuples(res));
1901 
1902  /* And clean up */
1903  PQclear(res);
1904 
1905  MemoryContextReset(fmstate->temp_cxt);
1906 
1907  /* Return NULL if nothing was updated on the remote end */
1908  return (n_rows > 0) ? slot : NULL;
1909 }
1910 
1911 /*
1912  * postgresExecForeignDelete
1913  * Delete one row from a foreign table
1914  */
1915 static TupleTableSlot *
1917  ResultRelInfo *resultRelInfo,
1918  TupleTableSlot *slot,
1919  TupleTableSlot *planSlot)
1920 {
1921  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1922  Datum datum;
1923  bool isNull;
1924  const char **p_values;
1925  PGresult *res;
1926  int n_rows;
1927 
1928  /* Set up the prepared statement on the remote server, if we didn't yet */
1929  if (!fmstate->p_name)
1930  prepare_foreign_modify(fmstate);
1931 
1932  /* Get the ctid that was passed up as a resjunk column */
1933  datum = ExecGetJunkAttribute(planSlot,
1934  fmstate->ctidAttno,
1935  &isNull);
1936  /* shouldn't ever get a null result... */
1937  if (isNull)
1938  elog(ERROR, "ctid is NULL");
1939 
1940  /* Convert parameters needed by prepared statement to text form */
1941  p_values = convert_prep_stmt_params(fmstate,
1942  (ItemPointer) DatumGetPointer(datum),
1943  NULL);
1944 
1945  /*
1946  * Execute the prepared statement.
1947  */
1948  if (!PQsendQueryPrepared(fmstate->conn,
1949  fmstate->p_name,
1950  fmstate->p_nums,
1951  p_values,
1952  NULL,
1953  NULL,
1954  0))
1955  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1956 
1957  /*
1958  * Get the result, and check for success.
1959  *
1960  * We don't use a PG_TRY block here, so be careful not to throw error
1961  * without releasing the PGresult.
1962  */
1963  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1964  if (PQresultStatus(res) !=
1966  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1967 
1968  /* Check number of rows affected, and fetch RETURNING tuple if any */
1969  if (fmstate->has_returning)
1970  {
1971  n_rows = PQntuples(res);
1972  if (n_rows > 0)
1973  store_returning_result(fmstate, slot, res);
1974  }
1975  else
1976  n_rows = atoi(PQcmdTuples(res));
1977 
1978  /* And clean up */
1979  PQclear(res);
1980 
1981  MemoryContextReset(fmstate->temp_cxt);
1982 
1983  /* Return NULL if nothing was deleted on the remote end */
1984  return (n_rows > 0) ? slot : NULL;
1985 }
1986 
1987 /*
1988  * postgresEndForeignModify
1989  * Finish an insert/update/delete operation on a foreign table
1990  */
1991 static void
1993  ResultRelInfo *resultRelInfo)
1994 {
1995  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1996 
1997  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1998  if (fmstate == NULL)
1999  return;
2000 
2001  /* If we created a prepared statement, destroy it */
2002  if (fmstate->p_name)
2003  {
2004  char sql[64];
2005  PGresult *res;
2006 
2007  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2008 
2009  /*
2010  * We don't use a PG_TRY block here, so be careful not to throw error
2011  * without releasing the PGresult.
2012  */
2013  res = pgfdw_exec_query(fmstate->conn, sql);
2014  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2015  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2016  PQclear(res);
2017  fmstate->p_name = NULL;
2018  }
2019 
2020  /* Release remote connection */
2021  ReleaseConnection(fmstate->conn);
2022  fmstate->conn = NULL;
2023 }
2024 
2025 /*
2026  * postgresIsForeignRelUpdatable
2027  * Determine whether a foreign table supports INSERT, UPDATE and/or
2028  * DELETE.
2029  */
2030 static int
2032 {
2033  bool updatable;
2034  ForeignTable *table;
2035  ForeignServer *server;
2036  ListCell *lc;
2037 
2038  /*
2039  * By default, all postgres_fdw foreign tables are assumed updatable. This
2040  * can be overridden by a per-server setting, which in turn can be
2041  * overridden by a per-table setting.
2042  */
2043  updatable = true;
2044 
2045  table = GetForeignTable(RelationGetRelid(rel));
2046  server = GetForeignServer(table->serverid);
2047 
2048  foreach(lc, server->options)
2049  {
2050  DefElem *def = (DefElem *) lfirst(lc);
2051 
2052  if (strcmp(def->defname, "updatable") == 0)
2053  updatable = defGetBoolean(def);
2054  }
2055  foreach(lc, table->options)
2056  {
2057  DefElem *def = (DefElem *) lfirst(lc);
2058 
2059  if (strcmp(def->defname, "updatable") == 0)
2060  updatable = defGetBoolean(def);
2061  }
2062 
2063  /*
2064  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2065  */
2066  return updatable ?
2067  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2068 }
2069 
2070 /*
2071  * postgresRecheckForeignScan
2072  * Execute a local join execution plan for a foreign join
2073  */
2074 static bool
2076 {
2077  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2080 
2081  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2082  if (scanrelid > 0)
2083  return true;
2084 
2085  Assert(outerPlan != NULL);
2086 
2087  /* Execute a local join execution plan */
2088  result = ExecProcNode(outerPlan);
2089  if (TupIsNull(result))
2090  return false;
2091 
2092  /* Store result in the given slot */
2093  ExecCopySlot(slot, result);
2094 
2095  return true;
2096 }
2097 
2098 /*
2099  * postgresPlanDirectModify
2100  * Consider a direct foreign table modification
2101  *
2102  * Decide whether it is safe to modify a foreign table directly, and if so,
2103  * rewrite subplan accordingly.
2104  */
2105 static bool
2107  ModifyTable *plan,
2108  Index resultRelation,
2109  int subplan_index)
2110 {
2111  CmdType operation = plan->operation;
2112  Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
2113  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
2114  Relation rel;
2115  StringInfoData sql;
2116  ForeignScan *fscan;
2117  List *targetAttrs = NIL;
2118  List *remote_conds;
2119  List *params_list = NIL;
2120  List *returningList = NIL;
2121  List *retrieved_attrs = NIL;
2122 
2123  /*
2124  * Decide whether it is safe to modify a foreign table directly.
2125  */
2126 
2127  /*
2128  * The table modification must be an UPDATE or DELETE.
2129  */
2130  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2131  return false;
2132 
2133  /*
2134  * It's unsafe to modify a foreign table directly if there are any local
2135  * joins needed.
2136  */
2137  if (!IsA(subplan, ForeignScan))
2138  return false;
2139 
2140  /*
2141  * It's unsafe to modify a foreign table directly if there are any quals
2142  * that should be evaluated locally.
2143  */
2144  if (subplan->qual != NIL)
2145  return false;
2146 
2147  /*
2148  * We can't handle an UPDATE or DELETE on a foreign join for now.
2149  */
2150  fscan = (ForeignScan *) subplan;
2151  if (fscan->scan.scanrelid == 0)
2152  return false;
2153 
2154  /*
2155  * It's unsafe to update a foreign table directly, if any expressions to
2156  * assign to the target columns are unsafe to evaluate remotely.
2157  */
2158  if (operation == CMD_UPDATE)
2159  {
2160  RelOptInfo *baserel = root->simple_rel_array[resultRelation];
2161  int col;
2162 
2163  /*
2164  * We transmit only columns that were explicitly targets of the
2165  * UPDATE, so as to avoid unnecessary data transmission.
2166  */
2167  col = -1;
2168  while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
2169  {
2170  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
2172  TargetEntry *tle;
2173 
2174  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2175  elog(ERROR, "system-column update is not supported");
2176 
2177  tle = get_tle_by_resno(subplan->targetlist, attno);
2178 
2179  if (!tle)
2180  elog(ERROR, "attribute number %d not found in subplan targetlist",
2181  attno);
2182 
2183  if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
2184  return false;
2185 
2186  targetAttrs = lappend_int(targetAttrs, attno);
2187  }
2188  }
2189 
2190  /*
2191  * Ok, rewrite subplan so as to modify the foreign table directly.
2192  */
2193  initStringInfo(&sql);
2194 
2195  /*
2196  * Core code already has some lock on each rel being planned, so we can
2197  * use NoLock here.
2198  */
2199  rel = heap_open(rte->relid, NoLock);
2200 
2201  /*
2202  * Extract the baserestrictinfo clauses that can be evaluated remotely.
2203  */
2204  remote_conds = (List *) list_nth(fscan->fdw_private,
2206 
2207  /*
2208  * Extract the relevant RETURNING list if any.
2209  */
2210  if (plan->returningLists)
2211  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2212 
2213  /*
2214  * Construct the SQL command string.
2215  */
2216  switch (operation)
2217  {
2218  case CMD_UPDATE:
2219  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2220  ((Plan *) fscan)->targetlist,
2221  targetAttrs,
2222  remote_conds, &params_list,
2223  returningList, &retrieved_attrs);
2224  break;
2225  case CMD_DELETE:
2226  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2227  remote_conds, &params_list,
2228  returningList, &retrieved_attrs);
2229  break;
2230  default:
2231  elog(ERROR, "unexpected operation: %d", (int) operation);
2232  break;
2233  }
2234 
2235  /*
2236  * Update the operation info.
2237  */
2238  fscan->operation = operation;
2239 
2240  /*
2241  * Update the fdw_exprs list that will be available to the executor.
2242  */
2243  fscan->fdw_exprs = params_list;
2244 
2245  /*
2246  * Update the fdw_private list that will be available to the executor.
2247  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2248  */
2249  fscan->fdw_private = list_make4(makeString(sql.data),
2250  makeInteger((retrieved_attrs != NIL)),
2251  retrieved_attrs,
2252  makeInteger(plan->canSetTag));
2253 
2254  heap_close(rel, NoLock);
2255  return true;
2256 }
2257 
2258 /*
2259  * postgresBeginDirectModify
2260  * Prepare a direct foreign table modification
2261  */
2262 static void
2264 {
2265  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2266  EState *estate = node->ss.ps.state;
2267  PgFdwDirectModifyState *dmstate;
2268  RangeTblEntry *rte;
2269  Oid userid;
2270  ForeignTable *table;
2271  UserMapping *user;
2272  int numParams;
2273 
2274  /*
2275  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2276  */
2277  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2278  return;
2279 
2280  /*
2281  * We'll save private state in node->fdw_state.
2282  */
2283  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2284  node->fdw_state = (void *) dmstate;
2285 
2286  /*
2287  * Identify which user to do the remote access as. This should match what
2288  * ExecCheckRTEPerms() does.
2289  */
2290  rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2291  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2292 
2293  /* Get info about foreign table. */
2294  dmstate->rel = node->ss.ss_currentRelation;
2295  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2296  user = GetUserMapping(userid, table->serverid);
2297 
2298  /*
2299  * Get connection to the foreign server. Connection manager will
2300  * establish new connection if necessary.
2301  */
2302  dmstate->conn = GetConnection(user, false);
2303 
2304  /* Initialize state variable */
2305  dmstate->num_tuples = -1; /* -1 means not set yet */
2306 
2307  /* Get private info created by planner functions. */
2308  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2310  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2312  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2314  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2316 
2317  /* Create context for per-tuple temp workspace. */
2318  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2319  "postgres_fdw temporary data",
2321 
2322  /* Prepare for input conversion of RETURNING results. */
2323  if (dmstate->has_returning)
2324  dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2325 
2326  /*
2327  * Prepare for processing of parameters used in remote query, if any.
2328  */
2329  numParams = list_length(fsplan->fdw_exprs);
2330  dmstate->numParams = numParams;
2331  if (numParams > 0)
2333  fsplan->fdw_exprs,
2334  numParams,
2335  &dmstate->param_flinfo,
2336  &dmstate->param_exprs,
2337  &dmstate->param_values);
2338 }
2339 
2340 /*
2341  * postgresIterateDirectModify
2342  * Execute a direct foreign table modification
2343  */
2344 static TupleTableSlot *
2346 {
2348  EState *estate = node->ss.ps.state;
2349  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
2350 
2351  /*
2352  * If this is the first call after Begin, execute the statement.
2353  */
2354  if (dmstate->num_tuples == -1)
2355  execute_dml_stmt(node);
2356 
2357  /*
2358  * If the local query doesn't specify RETURNING, just clear tuple slot.
2359  */
2360  if (!resultRelInfo->ri_projectReturning)
2361  {
2362  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2363  Instrumentation *instr = node->ss.ps.instrument;
2364 
2365  Assert(!dmstate->has_returning);
2366 
2367  /* Increment the command es_processed count if necessary. */
2368  if (dmstate->set_processed)
2369  estate->es_processed += dmstate->num_tuples;
2370 
2371  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2372  if (instr)
2373  instr->tuplecount += dmstate->num_tuples;
2374 
2375  return ExecClearTuple(slot);
2376  }
2377 
2378  /*
2379  * Get the next RETURNING tuple.
2380  */
2381  return get_returning_data(node);
2382 }
2383 
2384 /*
2385  * postgresEndDirectModify
2386  * Finish a direct foreign table modification
2387  */
2388 static void
2390 {
2392 
2393  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2394  if (dmstate == NULL)
2395  return;
2396 
2397  /* Release PGresult */
2398  if (dmstate->result)
2399  PQclear(dmstate->result);
2400 
2401  /* Release remote connection */
2402  ReleaseConnection(dmstate->conn);
2403  dmstate->conn = NULL;
2404 
2405  /* MemoryContext will be deleted automatically. */
2406 }
2407 
2408 /*
2409  * postgresExplainForeignScan
2410  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2411  */
2412 static void
2414 {
2415  List *fdw_private;
2416  char *sql;
2417  char *relations;
2418 
2419  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2420 
2421  /*
2422  * Add names of relation handled by the foreign scan when the scan is a
2423  * join
2424  */
2425  if (list_length(fdw_private) > FdwScanPrivateRelations)
2426  {
2427  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2428  ExplainPropertyText("Relations", relations, es);
2429  }
2430 
2431  /*
2432  * Add remote query, when VERBOSE option is specified.
2433  */
2434  if (es->verbose)
2435  {
2436  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2437  ExplainPropertyText("Remote SQL", sql, es);
2438  }
2439 }
2440 
2441 /*
2442  * postgresExplainForeignModify
2443  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2444  */
2445 static void
2447  ResultRelInfo *rinfo,
2448  List *fdw_private,
2449  int subplan_index,
2450  ExplainState *es)
2451 {
2452  if (es->verbose)
2453  {
2454  char *sql = strVal(list_nth(fdw_private,
2456 
2457  ExplainPropertyText("Remote SQL", sql, es);
2458  }
2459 }
2460 
2461 /*
2462  * postgresExplainDirectModify
2463  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2464  * foreign table directly
2465  */
2466 static void
2468 {
2469  List *fdw_private;
2470  char *sql;
2471 
2472  if (es->verbose)
2473  {
2474  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2475  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2476  ExplainPropertyText("Remote SQL", sql, es);
2477  }
2478 }
2479 
2480 
2481 /*
2482  * estimate_path_cost_size
2483  * Get cost and size estimates for a foreign scan on given foreign relation
2484  * either a base relation or a join between foreign relations or an upper
2485  * relation containing foreign relations.
2486  *
2487  * param_join_conds are the parameterization clauses with outer relations.
2488  * pathkeys specify the expected sort order if any for given path being costed.
2489  *
2490  * The function returns the cost and size estimates in p_row, p_width,
2491  * p_startup_cost and p_total_cost variables.
2492  */
2493 static void
2495  RelOptInfo *foreignrel,
2496  List *param_join_conds,
2497  List *pathkeys,
2498  double *p_rows, int *p_width,
2499  Cost *p_startup_cost, Cost *p_total_cost)
2500 {
2501  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2502  double rows;
2503  double retrieved_rows;
2504  int width;
2505  Cost startup_cost;
2506  Cost total_cost;
2507  Cost cpu_per_tuple;
2508 
2509  /*
2510  * If the table or the server is configured to use remote estimates,
2511  * connect to the foreign server and execute EXPLAIN to estimate the
2512  * number of rows selected by the restriction+join clauses. Otherwise,
2513  * estimate rows using whatever statistics we have locally, in a way
2514  * similar to ordinary tables.
2515  */
2516  if (fpinfo->use_remote_estimate)
2517  {
2518  List *remote_param_join_conds;
2519  List *local_param_join_conds;
2520  StringInfoData sql;
2521  PGconn *conn;
2522  Selectivity local_sel;
2523  QualCost local_cost;
2524  List *fdw_scan_tlist = NIL;
2525  List *remote_conds;
2526 
2527  /* Required only to be passed to deparseSelectStmtForRel */
2528  List *retrieved_attrs;
2529 
2530  /*
2531  * param_join_conds might contain both clauses that are safe to send
2532  * across, and clauses that aren't.
2533  */
2534  classifyConditions(root, foreignrel, param_join_conds,
2535  &remote_param_join_conds, &local_param_join_conds);
2536 
2537  /* Build the list of columns to be fetched from the foreign server. */
2538  if (foreignrel->reloptkind == RELOPT_JOINREL ||
2539  foreignrel->reloptkind == RELOPT_UPPER_REL)
2540  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2541  else
2542  fdw_scan_tlist = NIL;
2543 
2544  /*
2545  * The complete list of remote conditions includes everything from
2546  * baserestrictinfo plus any extra join_conds relevant to this
2547  * particular path.
2548  */
2549  remote_conds = list_concat(list_copy(remote_param_join_conds),
2550  fpinfo->remote_conds);
2551 
2552  /*
2553  * Construct EXPLAIN query including the desired SELECT, FROM, and
2554  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2555  * values, so don't request params_list.
2556  */
2557  initStringInfo(&sql);
2558  appendStringInfoString(&sql, "EXPLAIN ");
2559  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2560  remote_conds, pathkeys, false,
2561  &retrieved_attrs, NULL);
2562 
2563  /* Get the remote estimate */
2564  conn = GetConnection(fpinfo->user, false);
2565  get_remote_estimate(sql.data, conn, &rows, &width,
2566  &startup_cost, &total_cost);
2567  ReleaseConnection(conn);
2568 
2569  retrieved_rows = rows;
2570 
2571  /* Factor in the selectivity of the locally-checked quals */
2572  local_sel = clauselist_selectivity(root,
2573  local_param_join_conds,
2574  foreignrel->relid,
2575  JOIN_INNER,
2576  NULL);
2577  local_sel *= fpinfo->local_conds_sel;
2578 
2579  rows = clamp_row_est(rows * local_sel);
2580 
2581  /* Add in the eval cost of the locally-checked quals */
2582  startup_cost += fpinfo->local_conds_cost.startup;
2583  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2584  cost_qual_eval(&local_cost, local_param_join_conds, root);
2585  startup_cost += local_cost.startup;
2586  total_cost += local_cost.per_tuple * retrieved_rows;
2587  }
2588  else
2589  {
2590  Cost run_cost = 0;
2591 
2592  /*
2593  * We don't support join conditions in this mode (hence, no
2594  * parameterized paths can be made).
2595  */
2596  Assert(param_join_conds == NIL);
2597 
2598  /*
2599  * Use rows/width estimates made by set_baserel_size_estimates() for
2600  * base foreign relations and set_joinrel_size_estimates() for join
2601  * between foreign relations.
2602  */
2603  rows = foreignrel->rows;
2604  width = foreignrel->reltarget->width;
2605 
2606  /* Back into an estimate of the number of retrieved rows. */
2607  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2608 
2609  /*
2610  * We will come here again and again with different set of pathkeys
2611  * that caller wants to cost. We don't need to calculate the cost of
2612  * bare scan each time. Instead, use the costs if we have cached them
2613  * already.
2614  */
2615  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2616  {
2617  startup_cost = fpinfo->rel_startup_cost;
2618  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2619  }
2620  else if (foreignrel->reloptkind == RELOPT_JOINREL)
2621  {
2622  PgFdwRelationInfo *fpinfo_i;
2623  PgFdwRelationInfo *fpinfo_o;
2624  QualCost join_cost;
2625  QualCost remote_conds_cost;
2626  double nrows;
2627 
2628  /* For join we expect inner and outer relations set */
2629  Assert(fpinfo->innerrel && fpinfo->outerrel);
2630 
2631  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2632  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2633 
2634  /* Estimate of number of rows in cross product */
2635  nrows = fpinfo_i->rows * fpinfo_o->rows;
2636  /* Clamp retrieved rows estimate to at most size of cross product */
2637  retrieved_rows = Min(retrieved_rows, nrows);
2638 
2639  /*
2640  * The cost of foreign join is estimated as cost of generating
2641  * rows for the joining relations + cost for applying quals on the
2642  * rows.
2643  */
2644 
2645  /*
2646  * Calculate the cost of clauses pushed down to the foreign server
2647  */
2648  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2649  /* Calculate the cost of applying join clauses */
2650  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2651 
2652  /*
2653  * Startup cost includes startup cost of joining relations and the
2654  * startup cost for join and other clauses. We do not include the
2655  * startup cost specific to join strategy (e.g. setting up hash
2656  * tables) since we do not know what strategy the foreign server
2657  * is going to use.
2658  */
2659  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2660  startup_cost += join_cost.startup;
2661  startup_cost += remote_conds_cost.startup;
2662  startup_cost += fpinfo->local_conds_cost.startup;
2663 
2664  /*
2665  * Run time cost includes:
2666  *
2667  * 1. Run time cost (total_cost - startup_cost) of relations being
2668  * joined
2669  *
2670  * 2. Run time cost of applying join clauses on the cross product
2671  * of the joining relations.
2672  *
2673  * 3. Run time cost of applying pushed down other clauses on the
2674  * result of join
2675  *
2676  * 4. Run time cost of applying nonpushable other clauses locally
2677  * on the result fetched from the foreign server.
2678  */
2679  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2680  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2681  run_cost += nrows * join_cost.per_tuple;
2682  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2683  run_cost += nrows * remote_conds_cost.per_tuple;
2684  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2685  }
2686  else if (foreignrel->reloptkind == RELOPT_UPPER_REL)
2687  {
2688  PgFdwRelationInfo *ofpinfo;
2689  PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2690  AggClauseCosts aggcosts;
2691  double input_rows;
2692  int numGroupCols;
2693  double numGroups = 1;
2694 
2695  /*
2696  * This cost model is mixture of costing done for sorted and
2697  * hashed aggregates in cost_agg(). We are not sure which
2698  * strategy will be considered at remote side, thus for
2699  * simplicity, we put all startup related costs in startup_cost
2700  * and all finalization and run cost are added in total_cost.
2701  *
2702  * Also, core does not care about costing HAVING expressions and
2703  * adding that to the costs. So similarly, here too we are not
2704  * considering remote and local conditions for costing.
2705  */
2706 
2707  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2708 
2709  /* Get rows and width from input rel */
2710  input_rows = ofpinfo->rows;
2711  width = ofpinfo->width;
2712 
2713  /* Collect statistics about aggregates for estimating costs. */
2714  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2715  if (root->parse->hasAggs)
2716  {
2717  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2718  AGGSPLIT_SIMPLE, &aggcosts);
2719  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2720  AGGSPLIT_SIMPLE, &aggcosts);
2721  }
2722 
2723  /* Get number of grouping columns and possible number of groups */
2724  numGroupCols = list_length(root->parse->groupClause);
2725  numGroups = estimate_num_groups(root,
2727  fpinfo->grouped_tlist),
2728  input_rows, NULL);
2729 
2730  /*
2731  * Number of rows expected from foreign server will be same as
2732  * that of number of groups.
2733  */
2734  rows = retrieved_rows = numGroups;
2735 
2736  /*-----
2737  * Startup cost includes:
2738  * 1. Startup cost for underneath input * relation
2739  * 2. Cost of performing aggregation, per cost_agg()
2740  * 3. Startup cost for PathTarget eval
2741  *-----
2742  */
2743  startup_cost = ofpinfo->rel_startup_cost;
2744  startup_cost += aggcosts.transCost.startup;
2745  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2746  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2747  startup_cost += ptarget->cost.startup;
2748 
2749  /*-----
2750  * Run time cost includes:
2751  * 1. Run time cost of underneath input relation
2752  * 2. Run time cost of performing aggregation, per cost_agg()
2753  * 3. PathTarget eval cost for each output row
2754  *-----
2755  */
2756  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2757  run_cost += aggcosts.finalCost * numGroups;
2758  run_cost += cpu_tuple_cost * numGroups;
2759  run_cost += ptarget->cost.per_tuple * numGroups;
2760  }
2761  else
2762  {
2763  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2764  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2765 
2766  /*
2767  * Cost as though this were a seqscan, which is pessimistic. We
2768  * effectively imagine the local_conds are being evaluated
2769  * remotely, too.
2770  */
2771  startup_cost = 0;
2772  run_cost = 0;
2773  run_cost += seq_page_cost * foreignrel->pages;
2774 
2775  startup_cost += foreignrel->baserestrictcost.startup;
2776  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2777  run_cost += cpu_per_tuple * foreignrel->tuples;
2778  }
2779 
2780  /*
2781  * Without remote estimates, we have no real way to estimate the cost
2782  * of generating sorted output. It could be free if the query plan
2783  * the remote side would have chosen generates properly-sorted output
2784  * anyway, but in most cases it will cost something. Estimate a value
2785  * high enough that we won't pick the sorted path when the ordering
2786  * isn't locally useful, but low enough that we'll err on the side of
2787  * pushing down the ORDER BY clause when it's useful to do so.
2788  */
2789  if (pathkeys != NIL)
2790  {
2791  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2792  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2793  }
2794 
2795  total_cost = startup_cost + run_cost;
2796  }
2797 
2798  /*
2799  * Cache the costs for scans without any pathkeys or parameterization
2800  * before adding the costs for transferring data from the foreign server.
2801  * These costs are useful for costing the join between this relation and
2802  * another foreign relation or to calculate the costs of paths with
2803  * pathkeys for this relation, when the costs can not be obtained from the
2804  * foreign server. This function will be called at least once for every
2805  * foreign relation without pathkeys and parameterization.
2806  */
2807  if (pathkeys == NIL && param_join_conds == NIL)
2808  {
2809  fpinfo->rel_startup_cost = startup_cost;
2810  fpinfo->rel_total_cost = total_cost;
2811  }
2812 
2813  /*
2814  * Add some additional cost factors to account for connection overhead
2815  * (fdw_startup_cost), transferring data across the network
2816  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2817  * (cpu_tuple_cost per retrieved row).
2818  */
2819  startup_cost += fpinfo->fdw_startup_cost;
2820  total_cost += fpinfo->fdw_startup_cost;
2821  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2822  total_cost += cpu_tuple_cost * retrieved_rows;
2823 
2824  /* Return results. */
2825  *p_rows = rows;
2826  *p_width = width;
2827  *p_startup_cost = startup_cost;
2828  *p_total_cost = total_cost;
2829 }
2830 
2831 /*
2832  * Estimate costs of executing a SQL statement remotely.
2833  * The given "sql" must be an EXPLAIN command.
2834  */
2835 static void
2836 get_remote_estimate(const char *sql, PGconn *conn,
2837  double *rows, int *width,
2838  Cost *startup_cost, Cost *total_cost)
2839 {
2840  PGresult *volatile res = NULL;
2841 
2842  /* PGresult must be released before leaving this function. */
2843  PG_TRY();
2844  {
2845  char *line;
2846  char *p;
2847  int n;
2848 
2849  /*
2850  * Execute EXPLAIN remotely.
2851  */
2852  res = pgfdw_exec_query(conn, sql);
2853  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2854  pgfdw_report_error(ERROR, res, conn, false, sql);
2855 
2856  /*
2857  * Extract cost numbers for topmost plan node. Note we search for a
2858  * left paren from the end of the line to avoid being confused by
2859  * other uses of parentheses.
2860  */
2861  line = PQgetvalue(res, 0, 0);
2862  p = strrchr(line, '(');
2863  if (p == NULL)
2864  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2865  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2866  startup_cost, total_cost, rows, width);
2867  if (n != 4)
2868  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2869 
2870  PQclear(res);
2871  res = NULL;
2872  }
2873  PG_CATCH();
2874  {
2875  if (res)
2876  PQclear(res);
2877  PG_RE_THROW();
2878  }
2879  PG_END_TRY();
2880 }
2881 
2882 /*
2883  * Detect whether we want to process an EquivalenceClass member.
2884  *
2885  * This is a callback for use by generate_implied_equalities_for_column.
2886  */
2887 static bool
2890  void *arg)
2891 {
2893  Expr *expr = em->em_expr;
2894 
2895  /*
2896  * If we've identified what we're processing in the current scan, we only
2897  * want to match that expression.
2898  */
2899  if (state->current != NULL)
2900  return equal(expr, state->current);
2901 
2902  /*
2903  * Otherwise, ignore anything we've already processed.
2904  */
2905  if (list_member(state->already_used, expr))
2906  return false;
2907 
2908  /* This is the new target to process. */
2909  state->current = expr;
2910  return true;
2911 }
2912 
2913 /*
2914  * Create cursor for node's query with current parameter values.
2915  */
2916 static void
2918 {
2919  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2920  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2921  int numParams = fsstate->numParams;
2922  const char **values = fsstate->param_values;
2923  PGconn *conn = fsstate->conn;
2925  PGresult *res;
2926 
2927  /*
2928  * Construct array of query parameter values in text format. We do the
2929  * conversions in the short-lived per-tuple context, so as not to cause a
2930  * memory leak over repeated scans.
2931  */
2932  if (numParams > 0)
2933  {
2934  MemoryContext oldcontext;
2935 
2936  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2937 
2938  process_query_params(econtext,
2939  fsstate->param_flinfo,
2940  fsstate->param_exprs,
2941  values);
2942 
2943  MemoryContextSwitchTo(oldcontext);
2944  }
2945 
2946  /* Construct the DECLARE CURSOR command */
2947  initStringInfo(&buf);
2948  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2949  fsstate->cursor_number, fsstate->query);
2950 
2951  /*
2952  * Notice that we pass NULL for paramTypes, thus forcing the remote server
2953  * to infer types for all parameters. Since we explicitly cast every
2954  * parameter (see deparse.c), the "inference" is trivial and will produce
2955  * the desired result. This allows us to avoid assuming that the remote
2956  * server has the same OIDs we do for the parameters' types.
2957  */
2958  if (!PQsendQueryParams(conn, buf.data, numParams,
2959  NULL, values, NULL, NULL, 0))
2960  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2961 
2962  /*
2963  * Get the result, and check for success.
2964  *
2965  * We don't use a PG_TRY block here, so be careful not to throw error
2966  * without releasing the PGresult.
2967  */
2968  res = pgfdw_get_result(conn, buf.data);
2969  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2970  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2971  PQclear(res);
2972 
2973  /* Mark the cursor as created, and show no tuples have been retrieved */
2974  fsstate->cursor_exists = true;
2975  fsstate->tuples = NULL;
2976  fsstate->num_tuples = 0;
2977  fsstate->next_tuple = 0;
2978  fsstate->fetch_ct_2 = 0;
2979  fsstate->eof_reached = false;
2980 
2981  /* Clean up */
2982  pfree(buf.data);
2983 }
2984 
2985 /*
2986  * Fetch some more rows from the node's cursor.
2987  */
2988 static void
2990 {
2991  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2992  PGresult *volatile res = NULL;
2993  MemoryContext oldcontext;
2994 
2995  /*
2996  * We'll store the tuples in the batch_cxt. First, flush the previous
2997  * batch.
2998  */
2999  fsstate->tuples = NULL;
3000  MemoryContextReset(fsstate->batch_cxt);
3001  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3002 
3003  /* PGresult must be released before leaving this function. */
3004  PG_TRY();
3005  {
3006  PGconn *conn = fsstate->conn;
3007  char sql[64];
3008  int numrows;
3009  int i;
3010 
3011  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3012  fsstate->fetch_size, fsstate->cursor_number);
3013 
3014  res = pgfdw_exec_query(conn, sql);
3015  /* On error, report the original query, not the FETCH. */
3016  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3017  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3018 
3019  /* Convert the data into HeapTuples */
3020  numrows = PQntuples(res);
3021  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3022  fsstate->num_tuples = numrows;
3023  fsstate->next_tuple = 0;
3024 
3025  for (i = 0; i < numrows; i++)
3026  {
3027  Assert(IsA(node->ss.ps.plan, ForeignScan));
3028 
3029  fsstate->tuples[i] =
3031  fsstate->rel,
3032  fsstate->attinmeta,
3033  fsstate->retrieved_attrs,
3034  node,
3035  fsstate->temp_cxt);
3036  }
3037 
3038  /* Update fetch_ct_2 */
3039  if (fsstate->fetch_ct_2 < 2)
3040  fsstate->fetch_ct_2++;
3041 
3042  /* Must be EOF if we didn't get as many tuples as we asked for. */
3043  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3044 
3045  PQclear(res);
3046  res = NULL;
3047  }
3048  PG_CATCH();
3049  {
3050  if (res)
3051  PQclear(res);
3052  PG_RE_THROW();
3053  }
3054  PG_END_TRY();
3055 
3056  MemoryContextSwitchTo(oldcontext);
3057 }
3058 
3059 /*
3060  * Force assorted GUC parameters to settings that ensure that we'll output
3061  * data values in a form that is unambiguous to the remote server.
3062  *
3063  * This is rather expensive and annoying to do once per row, but there's
3064  * little choice if we want to be sure values are transmitted accurately;
3065  * we can't leave the settings in place between rows for fear of affecting
3066  * user-visible computations.
3067  *
3068  * We use the equivalent of a function SET option to allow the settings to
3069  * persist only until the caller calls reset_transmission_modes(). If an
3070  * error is thrown in between, guc.c will take care of undoing the settings.
3071  *
3072  * The return value is the nestlevel that must be passed to
3073  * reset_transmission_modes() to undo things.
3074  */
3075 int
3077 {
3078  int nestlevel = NewGUCNestLevel();
3079 
3080  /*
3081  * The values set here should match what pg_dump does. See also
3082  * configure_remote_session in connection.c.
3083  */
3084  if (DateStyle != USE_ISO_DATES)
3085  (void) set_config_option("datestyle", "ISO",
3087  GUC_ACTION_SAVE, true, 0, false);
3089  (void) set_config_option("intervalstyle", "postgres",
3091  GUC_ACTION_SAVE, true, 0, false);
3092  if (extra_float_digits < 3)
3093  (void) set_config_option("extra_float_digits", "3",
3095  GUC_ACTION_SAVE, true, 0, false);
3096 
3097  return nestlevel;
3098 }
3099 
3100 /*
3101  * Undo the effects of set_transmission_modes().
3102  */
3103 void
3105 {
3106  AtEOXact_GUC(true, nestlevel);
3107 }
3108 
3109 /*
3110  * Utility routine to close a cursor.
3111  */
3112 static void
3114 {
3115  char sql[64];
3116  PGresult *res;
3117 
3118  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3119 
3120  /*
3121  * We don't use a PG_TRY block here, so be careful not to throw error
3122  * without releasing the PGresult.
3123  */
3124  res = pgfdw_exec_query(conn, sql);
3125  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3126  pgfdw_report_error(ERROR, res, conn, true, sql);
3127  PQclear(res);
3128 }
3129 
3130 /*
3131  * prepare_foreign_modify
3132  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
3133  */
3134 static void
3136 {
3137  char prep_name[NAMEDATALEN];
3138  char *p_name;
3139  PGresult *res;
3140 
3141  /* Construct name we'll use for the prepared statement. */
3142  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
3143  GetPrepStmtNumber(fmstate->conn));
3144  p_name = pstrdup(prep_name);
3145 
3146  /*
3147  * We intentionally do not specify parameter types here, but leave the
3148  * remote server to derive them by default. This avoids possible problems
3149  * with the remote server using different type OIDs than we do. All of
3150  * the prepared statements we use in this module are simple enough that
3151  * the remote server will make the right choices.
3152  */
3153  if (!PQsendPrepare(fmstate->conn,
3154  p_name,
3155  fmstate->query,
3156  0,
3157  NULL))
3158  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
3159 
3160  /*
3161  * Get the result, and check for success.
3162  *
3163  * We don't use a PG_TRY block here, so be careful not to throw error
3164  * without releasing the PGresult.
3165  */
3166  res = pgfdw_get_result(fmstate->conn, fmstate->query);
3167  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3168  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
3169  PQclear(res);
3170 
3171  /* This action shows that the prepare has been done. */
3172  fmstate->p_name = p_name;
3173 }
3174 
3175 /*
3176  * convert_prep_stmt_params
3177  * Create array of text strings representing parameter values
3178  *
3179  * tupleid is ctid to send, or NULL if none
3180  * slot is slot to get remaining parameters from, or NULL if none
3181  *
3182  * Data is constructed in temp_cxt; caller should reset that after use.
3183  */
3184 static const char **
3186  ItemPointer tupleid,
3187  TupleTableSlot *slot)
3188 {
3189  const char **p_values;
3190  int pindex = 0;
3191  MemoryContext oldcontext;
3192 
3193  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3194 
3195  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3196 
3197  /* 1st parameter should be ctid, if it's in use */
3198  if (tupleid != NULL)
3199  {
3200  /* don't need set_transmission_modes for TID output */
3201  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3202  PointerGetDatum(tupleid));
3203  pindex++;
3204  }
3205 
3206  /* get following parameters from slot */
3207  if (slot != NULL && fmstate->target_attrs != NIL)
3208  {
3209  int nestlevel;
3210  ListCell *lc;
3211 
3212  nestlevel = set_transmission_modes();
3213 
3214  foreach(lc, fmstate->target_attrs)
3215  {
3216  int attnum = lfirst_int(lc);
3217  Datum value;
3218  bool isnull;
3219 
3220  value = slot_getattr(slot, attnum, &isnull);
3221  if (isnull)
3222  p_values[pindex] = NULL;
3223  else
3224  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3225  value);
3226  pindex++;
3227  }
3228 
3229  reset_transmission_modes(nestlevel);
3230  }
3231 
3232  Assert(pindex == fmstate->p_nums);
3233 
3234  MemoryContextSwitchTo(oldcontext);
3235 
3236  return p_values;
3237 }
3238 
3239 /*
3240  * store_returning_result
3241  * Store the result of a RETURNING clause
3242  *
3243  * On error, be sure to release the PGresult on the way out. Callers do not
3244  * have PG_TRY blocks to ensure this happens.
3245  */
3246 static void
3248  TupleTableSlot *slot, PGresult *res)
3249 {
3250  PG_TRY();
3251  {
3252  HeapTuple newtup;
3253 
3254  newtup = make_tuple_from_result_row(res, 0,
3255  fmstate->rel,
3256  fmstate->attinmeta,
3257  fmstate->retrieved_attrs,
3258  NULL,
3259  fmstate->temp_cxt);
3260  /* tuple will be deleted when it is cleared from the slot */
3261  ExecStoreTuple(newtup, slot, InvalidBuffer, true);
3262  }
3263  PG_CATCH();
3264  {
3265  if (res)
3266  PQclear(res);
3267  PG_RE_THROW();
3268  }
3269  PG_END_TRY();
3270 }
3271 
3272 /*
3273  * Execute a direct UPDATE/DELETE statement.
3274  */
3275 static void
3277 {
3279  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3280  int numParams = dmstate->numParams;
3281  const char **values = dmstate->param_values;
3282 
3283  /*
3284  * Construct array of query parameter values in text format.
3285  */
3286  if (numParams > 0)
3287  process_query_params(econtext,
3288  dmstate->param_flinfo,
3289  dmstate->param_exprs,
3290  values);
3291 
3292  /*
3293  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3294  * to infer types for all parameters. Since we explicitly cast every
3295  * parameter (see deparse.c), the "inference" is trivial and will produce
3296  * the desired result. This allows us to avoid assuming that the remote
3297  * server has the same OIDs we do for the parameters' types.
3298  */
3299  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3300  NULL, values, NULL, NULL, 0))
3301  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3302 
3303  /*
3304  * Get the result, and check for success.
3305  *
3306  * We don't use a PG_TRY block here, so be careful not to throw error
3307  * without releasing the PGresult.
3308  */
3309  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3310  if (PQresultStatus(dmstate->result) !=
3312  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3313  dmstate->query);
3314 
3315  /* Get the number of rows affected. */
3316  if (dmstate->has_returning)
3317  dmstate->num_tuples = PQntuples(dmstate->result);
3318  else
3319  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3320 }
3321 
3322 /*
3323  * Get the result of a RETURNING clause.
3324  */
3325 static TupleTableSlot *
3327 {
3329  EState *estate = node->ss.ps.state;
3330  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3331  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3332 
3333  Assert(resultRelInfo->ri_projectReturning);
3334 
3335  /* If we didn't get any tuples, must be end of data. */
3336  if (dmstate->next_tuple >= dmstate->num_tuples)
3337  return ExecClearTuple(slot);
3338 
3339  /* Increment the command es_processed count if necessary. */
3340  if (dmstate->set_processed)
3341  estate->es_processed += 1;
3342 
3343  /*
3344  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3345  * tuple. (has_returning is false when the local query is of the form
3346  * "UPDATE/DELETE .. RETURNING 1" for example.)
3347  */
3348  if (!dmstate->has_returning)
3349  ExecStoreAllNullTuple(slot);
3350  else
3351  {
3352  /*
3353  * On error, be sure to release the PGresult on the way out. Callers
3354  * do not have PG_TRY blocks to ensure this happens.
3355  */
3356  PG_TRY();
3357  {
3358  HeapTuple newtup;
3359 
3360  newtup = make_tuple_from_result_row(dmstate->result,
3361  dmstate->next_tuple,
3362  dmstate->rel,
3363  dmstate->attinmeta,
3364  dmstate->retrieved_attrs,
3365  NULL,
3366  dmstate->temp_cxt);
3367  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3368  }
3369  PG_CATCH();
3370  {
3371  if (dmstate->result)
3372  PQclear(dmstate->result);
3373  PG_RE_THROW();
3374  }
3375  PG_END_TRY();
3376  }
3377  dmstate->next_tuple++;
3378 
3379  /* Make slot available for evaluation of the local query RETURNING list. */
3380  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3381 
3382  return slot;
3383 }
3384 
3385 /*
3386  * Prepare for processing of parameters used in remote query.
3387  */
3388 static void
3390  List *fdw_exprs,
3391  int numParams,
3392  FmgrInfo **param_flinfo,
3393  List **param_exprs,
3394  const char ***param_values)
3395 {
3396  int i;
3397  ListCell *lc;
3398 
3399  Assert(numParams > 0);
3400 
3401  /* Prepare for output conversion of parameters used in remote query. */
3402  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
3403 
3404  i = 0;
3405  foreach(lc, fdw_exprs)
3406  {
3407  Node *param_expr = (Node *) lfirst(lc);
3408  Oid typefnoid;
3409  bool isvarlena;
3410 
3411  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
3412  fmgr_info(typefnoid, &(*param_flinfo)[i]);
3413  i++;
3414  }
3415 
3416  /*
3417  * Prepare remote-parameter expressions for evaluation. (Note: in
3418  * practice, we expect that all these expressions will be just Params, so
3419  * we could possibly do something more efficient than using the full
3420  * expression-eval machinery for this. But probably there would be little
3421  * benefit, and it'd require postgres_fdw to know more than is desirable
3422  * about Param evaluation.)
3423  */
3424  *param_exprs = ExecInitExprList(fdw_exprs, node);
3425 
3426  /* Allocate buffer for text form of query parameters. */
3427  *param_values = (const char **) palloc0(numParams * sizeof(char *));
3428 }
3429 
3430 /*
3431  * Construct array of query parameter values in text format.
3432  */
3433 static void
3435  FmgrInfo *param_flinfo,
3436  List *param_exprs,
3437  const char **param_values)
3438 {
3439  int nestlevel;
3440  int i;
3441  ListCell *lc;
3442 
3443  nestlevel = set_transmission_modes();
3444 
3445  i = 0;
3446  foreach(lc, param_exprs)
3447  {
3448  ExprState *expr_state = (ExprState *) lfirst(lc);
3449  Datum expr_value;
3450  bool isNull;
3451 
3452  /* Evaluate the parameter expression */
3453  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
3454 
3455  /*
3456  * Get string representation of each parameter value by invoking
3457  * type-specific output function, unless the value is null.
3458  */
3459  if (isNull)
3460  param_values[i] = NULL;
3461  else
3462  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
3463 
3464  i++;
3465  }
3466 
3467  reset_transmission_modes(nestlevel);
3468 }
3469 
3470 /*
3471  * postgresAnalyzeForeignTable
3472  * Test whether analyzing this foreign table is supported
3473  */
3474 static bool
3476  AcquireSampleRowsFunc *func,
3477  BlockNumber *totalpages)
3478 {
3479  ForeignTable *table;
3480  UserMapping *user;
3481  PGconn *conn;
3482  StringInfoData sql;
3483  PGresult *volatile res = NULL;
3484 
3485  /* Return the row-analysis function pointer */
3487 
3488  /*
3489  * Now we have to get the number of pages. It's annoying that the ANALYZE
3490  * API requires us to return that now, because it forces some duplication
3491  * of effort between this routine and postgresAcquireSampleRowsFunc. But
3492  * it's probably not worth redefining that API at this point.
3493  */
3494 
3495  /*
3496  * Get the connection to use. We do the remote access as the table's
3497  * owner, even if the ANALYZE was started by some other user.
3498  */
3499  table = GetForeignTable(RelationGetRelid(relation));
3500  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3501  conn = GetConnection(user, false);
3502 
3503  /*
3504  * Construct command to get page count for relation.
3505  */
3506  initStringInfo(&sql);
3507  deparseAnalyzeSizeSql(&sql, relation);
3508 
3509  /* In what follows, do not risk leaking any PGresults. */
3510  PG_TRY();
3511  {
3512  res = pgfdw_exec_query(conn, sql.data);
3513  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3514  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3515 
3516  if (PQntuples(res) != 1 || PQnfields(res) != 1)
3517  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3518  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3519 
3520  PQclear(res);
3521  res = NULL;
3522  }
3523  PG_CATCH();
3524  {
3525  if (res)
3526  PQclear(res);
3527  PG_RE_THROW();
3528  }
3529  PG_END_TRY();
3530 
3531  ReleaseConnection(conn);
3532 
3533  return true;
3534 }
3535 
3536 /*
3537  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
3538  *
3539  * We fetch the whole table from the remote side and pick out some sample rows.
3540  *
3541  * Selected rows are returned in the caller-allocated array rows[],
3542  * which must have at least targrows entries.
3543  * The actual number of rows selected is returned as the function result.
3544  * We also count the total number of rows in the table and return it into
3545  * *totalrows. Note that *totaldeadrows is always set to 0.
3546  *
3547  * Note that the returned list of rows is not always in order by physical
3548  * position in the table. Therefore, correlation estimates derived later
3549  * may be meaningless, but it's OK because we don't use the estimates
3550  * currently (the planner only pays attention to correlation for indexscans).
3551  */
3552 static int
3554  HeapTuple *rows, int targrows,
3555  double *totalrows,
3556  double *totaldeadrows)
3557 {
3558  PgFdwAnalyzeState astate;
3559  ForeignTable *table;
3560  ForeignServer *server;
3561  UserMapping *user;
3562  PGconn *conn;
3563  unsigned int cursor_number;
3564  StringInfoData sql;
3565  PGresult *volatile res = NULL;
3566 
3567  /* Initialize workspace state */
3568  astate.rel = relation;
3570 
3571  astate.rows = rows;
3572  astate.targrows = targrows;
3573  astate.numrows = 0;
3574  astate.samplerows = 0;
3575  astate.rowstoskip = -1; /* -1 means not set yet */
3576  reservoir_init_selection_state(&astate.rstate, targrows);
3577 
3578  /* Remember ANALYZE context, and create a per-tuple temp context */
3579  astate.anl_cxt = CurrentMemoryContext;
3581  "postgres_fdw temporary data",
3583 
3584  /*
3585  * Get the connection to use. We do the remote access as the table's
3586  * owner, even if the ANALYZE was started by some other user.
3587  */
3588  table = GetForeignTable(RelationGetRelid(relation));
3589  server = GetForeignServer(table->serverid);
3590  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3591  conn = GetConnection(user, false);
3592 
3593  /*
3594  * Construct cursor that retrieves whole rows from remote.
3595  */
3596  cursor_number = GetCursorNumber(conn);
3597  initStringInfo(&sql);
3598  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3599  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3600 
3601  /* In what follows, do not risk leaking any PGresults. */
3602  PG_TRY();
3603  {
3604  res = pgfdw_exec_query(conn, sql.data);
3605  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3606  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3607  PQclear(res);
3608  res = NULL;
3609 
3610  /* Retrieve and process rows a batch at a time. */
3611  for (;;)
3612  {
3613  char fetch_sql[64];
3614  int fetch_size;
3615  int numrows;
3616  int i;
3617  ListCell *lc;
3618 
3619  /* Allow users to cancel long query */
3621 
3622  /*
3623  * XXX possible future improvement: if rowstoskip is large, we
3624  * could issue a MOVE rather than physically fetching the rows,
3625  * then just adjust rowstoskip and samplerows appropriately.
3626  */
3627 
3628  /* The fetch size is arbitrary, but shouldn't be enormous. */
3629  fetch_size = 100;
3630  foreach(lc, server->options)
3631  {
3632  DefElem *def = (DefElem *) lfirst(lc);
3633 
3634  if (strcmp(def->defname, "fetch_size") == 0)
3635  {
3636  fetch_size = strtol(defGetString(def), NULL, 10);
3637  break;
3638  }
3639  }
3640  foreach(lc, table->options)
3641  {
3642  DefElem *def = (DefElem *) lfirst(lc);
3643 
3644  if (strcmp(def->defname, "fetch_size") == 0)
3645  {
3646  fetch_size = strtol(defGetString(def), NULL, 10);
3647  break;
3648  }
3649  }
3650 
3651  /* Fetch some rows */
3652  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3653  fetch_size, cursor_number);
3654 
3655  res = pgfdw_exec_query(conn, fetch_sql);
3656  /* On error, report the original query, not the FETCH. */
3657  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3658  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3659 
3660  /* Process whatever we got. */
3661  numrows = PQntuples(res);
3662  for (i = 0; i < numrows; i++)
3663  analyze_row_processor(res, i, &astate);
3664 
3665  PQclear(res);
3666  res = NULL;
3667 
3668  /* Must be EOF if we didn't get all the rows requested. */
3669  if (numrows < fetch_size)
3670  break;
3671  }
3672 
3673  /* Close the cursor, just to be tidy. */
3674  close_cursor(conn, cursor_number);
3675  }
3676  PG_CATCH();
3677  {
3678  if (res)
3679  PQclear(res);
3680  PG_RE_THROW();
3681  }
3682  PG_END_TRY();
3683 
3684  ReleaseConnection(conn);
3685 
3686  /* We assume that we have no dead tuple. */
3687  *totaldeadrows = 0.0;
3688 
3689  /* We've retrieved all living tuples from foreign server. */
3690  *totalrows = astate.samplerows;
3691 
3692  /*
3693  * Emit some interesting relation info
3694  */
3695  ereport(elevel,
3696  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3697  RelationGetRelationName(relation),
3698  astate.samplerows, astate.numrows)));
3699 
3700  return astate.numrows;
3701 }
3702 
3703 /*
3704  * Collect sample rows from the result of query.
3705  * - Use all tuples in sample until target # of samples are collected.
3706  * - Subsequently, replace already-sampled tuples randomly.
3707  */
3708 static void
3710 {
3711  int targrows = astate->targrows;
3712  int pos; /* array index to store tuple in */
3713  MemoryContext oldcontext;
3714 
3715  /* Always increment sample row counter. */
3716  astate->samplerows += 1;
3717 
3718  /*
3719  * Determine the slot where this sample row should be stored. Set pos to
3720  * negative value to indicate the row should be skipped.
3721  */
3722  if (astate->numrows < targrows)
3723  {
3724  /* First targrows rows are always included into the sample */
3725  pos = astate->numrows++;
3726  }
3727  else
3728  {
3729  /*
3730  * Now we start replacing tuples in the sample until we reach the end
3731  * of the relation. Same algorithm as in acquire_sample_rows in
3732  * analyze.c; see Jeff Vitter's paper.
3733  */
3734  if (astate->rowstoskip < 0)
3735  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3736 
3737  if (astate->rowstoskip <= 0)
3738  {
3739  /* Choose a random reservoir element to replace. */
3740  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3741  Assert(pos >= 0 && pos < targrows);
3742  heap_freetuple(astate->rows[pos]);
3743  }
3744  else
3745  {
3746  /* Skip this tuple. */
3747  pos = -1;
3748  }
3749 
3750  astate->rowstoskip -= 1;
3751  }
3752 
3753  if (pos >= 0)
3754  {
3755  /*
3756  * Create sample tuple from current result row, and store it in the
3757  * position determined above. The tuple has to be created in anl_cxt.
3758  */
3759  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3760 
3761  astate->rows[pos] = make_tuple_from_result_row(res, row,
3762  astate->rel,
3763  astate->attinmeta,
3764  astate->retrieved_attrs,
3765  NULL,
3766  astate->temp_cxt);
3767 
3768  MemoryContextSwitchTo(oldcontext);
3769  }
3770 }
3771 
3772 /*
3773  * Import a foreign schema
3774  */
3775 static List *
3777 {
3778  List *commands = NIL;
3779  bool import_collate = true;
3780  bool import_default = false;
3781  bool import_not_null = true;
3782  ForeignServer *server;
3783  UserMapping *mapping;
3784  PGconn *conn;
3786  PGresult *volatile res = NULL;
3787  int numrows,
3788  i;
3789  ListCell *lc;
3790 
3791  /* Parse statement options */
3792  foreach(lc, stmt->options)
3793  {
3794  DefElem *def = (DefElem *) lfirst(lc);
3795 
3796  if (strcmp(def->defname, "import_collate") == 0)
3797  import_collate = defGetBoolean(def);
3798  else if (strcmp(def->defname, "import_default") == 0)
3799  import_default = defGetBoolean(def);
3800  else if (strcmp(def->defname, "import_not_null") == 0)
3801  import_not_null = defGetBoolean(def);
3802  else
3803  ereport(ERROR,
3804  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
3805  errmsg("invalid option \"%s\"", def->defname)));
3806  }
3807 
3808  /*
3809  * Get connection to the foreign server. Connection manager will
3810  * establish new connection if necessary.
3811  */
3812  server = GetForeignServer(serverOid);
3813  mapping = GetUserMapping(GetUserId(), server->serverid);
3814  conn = GetConnection(mapping, false);
3815 
3816  /* Don't attempt to import collation if remote server hasn't got it */
3817  if (PQserverVersion(conn) < 90100)
3818  import_collate = false;
3819 
3820  /* Create workspace for strings */
3821  initStringInfo(&buf);
3822 
3823  /* In what follows, do not risk leaking any PGresults. */
3824  PG_TRY();
3825  {
3826  /* Check that the schema really exists */
3827  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
3828  deparseStringLiteral(&buf, stmt->remote_schema);
3829 
3830  res = pgfdw_exec_query(conn, buf.data);
3831  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3832  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3833 
3834  if (PQntuples(res) != 1)
3835  ereport(ERROR,
3836  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
3837  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
3838  stmt->remote_schema, server->servername)));
3839 
3840  PQclear(res);
3841  res = NULL;
3842  resetStringInfo(&buf);
3843 
3844  /*
3845  * Fetch all table data from this schema, possibly restricted by
3846  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
3847  * to EXCEPT/LIMIT TO here, because the core code will filter the
3848  * statements we return according to those lists anyway. But it
3849  * should save a few cycles to not process excluded tables in the
3850  * first place.)
3851  *
3852  * Note: because we run the connection with search_path restricted to
3853  * pg_catalog, the format_type() and pg_get_expr() outputs will always
3854  * include a schema name for types/functions in other schemas, which
3855  * is what we want.
3856  */
3857  if (import_collate)
3859  "SELECT relname, "
3860  " attname, "
3861  " format_type(atttypid, atttypmod), "
3862  " attnotnull, "
3863  " pg_get_expr(adbin, adrelid), "
3864  " collname, "
3865  " collnsp.nspname "
3866  "FROM pg_class c "
3867  " JOIN pg_namespace n ON "
3868  " relnamespace = n.oid "
3869  " LEFT JOIN pg_attribute a ON "
3870  " attrelid = c.oid AND attnum > 0 "
3871  " AND NOT attisdropped "
3872  " LEFT JOIN pg_attrdef ad ON "
3873  " adrelid = c.oid AND adnum = attnum "
3874  " LEFT JOIN pg_collation coll ON "
3875  " coll.oid = attcollation "
3876  " LEFT JOIN pg_namespace collnsp ON "
3877  " collnsp.oid = collnamespace ");
3878  else
3880  "SELECT relname, "
3881  " attname, "
3882  " format_type(atttypid, atttypmod), "
3883  " attnotnull, "
3884  " pg_get_expr(adbin, adrelid), "
3885  " NULL, NULL "
3886  "FROM pg_class c "
3887  " JOIN pg_namespace n ON "
3888  " relnamespace = n.oid "
3889  " LEFT JOIN pg_attribute a ON "
3890  " attrelid = c.oid AND attnum > 0 "
3891  " AND NOT attisdropped "
3892  " LEFT JOIN pg_attrdef ad ON "
3893  " adrelid = c.oid AND adnum = attnum ");
3894 
3896  "WHERE c.relkind IN ("
3901  " AND n.nspname = ");
3902  deparseStringLiteral(&buf, stmt->remote_schema);
3903 
3904  /* Apply restrictions for LIMIT TO and EXCEPT */
3905  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
3907  {
3908  bool first_item = true;
3909 
3910  appendStringInfoString(&buf, " AND c.relname ");
3911  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
3912  appendStringInfoString(&buf, "NOT ");
3913  appendStringInfoString(&buf, "IN (");
3914 
3915  /* Append list of table names within IN clause */
3916  foreach(lc, stmt->table_list)
3917  {
3918  RangeVar *rv = (RangeVar *) lfirst(lc);
3919 
3920  if (first_item)
3921  first_item = false;
3922  else
3923  appendStringInfoString(&buf, ", ");
3924  deparseStringLiteral(&buf, rv->relname);
3925  }
3926  appendStringInfoChar(&buf, ')');
3927  }
3928 
3929  /* Append ORDER BY at the end of query to ensure output ordering */
3930  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
3931 
3932  /* Fetch the data */
3933  res = pgfdw_exec_query(conn, buf.data);
3934  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3935  pgfdw_report_error(ERROR, res, conn, false, buf.data);
3936 
3937  /* Process results */
3938  numrows = PQntuples(res);
3939  /* note: incrementation of i happens in inner loop's while() test */
3940  for (i = 0; i < numrows;)
3941  {
3942  char *tablename = PQgetvalue(res, i, 0);
3943  bool first_item = true;
3944 
3945  resetStringInfo(&buf);
3946  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
3947  quote_identifier(tablename));
3948 
3949  /* Scan all rows for this table */
3950  do
3951  {
3952  char *attname;
3953  char *typename;
3954  char *attnotnull;
3955  char *attdefault;
3956  char *collname;
3957  char *collnamespace;
3958 
3959  /* If table has no columns, we'll see nulls here */
3960  if (PQgetisnull(res, i, 1))
3961  continue;
3962 
3963  attname = PQgetvalue(res, i, 1);
3964  typename = PQgetvalue(res, i, 2);
3965  attnotnull = PQgetvalue(res, i, 3);
3966  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
3967  PQgetvalue(res, i, 4);
3968  collname = PQgetisnull(res, i, 5) ? (char *) NULL :
3969  PQgetvalue(res, i, 5);
3970  collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
3971  PQgetvalue(res, i, 6);
3972 
3973  if (first_item)
3974  first_item = false;
3975  else
3976  appendStringInfoString(&buf, ",\n");
3977 
3978  /* Print column name and type */
3979  appendStringInfo(&buf, " %s %s",
3980  quote_identifier(attname),
3981  typename);
3982 
3983  /*
3984  * Add column_name option so that renaming the foreign table's
3985  * column doesn't break the association to the underlying
3986  * column.
3987  */
3988  appendStringInfoString(&buf, " OPTIONS (column_name ");
3989  deparseStringLiteral(&buf, attname);
3990  appendStringInfoChar(&buf, ')');
3991 
3992  /* Add COLLATE if needed */
3993  if (import_collate && collname != NULL && collnamespace != NULL)
3994  appendStringInfo(&buf, " COLLATE %s.%s",
3995  quote_identifier(collnamespace),
3996  quote_identifier(collname));
3997 
3998  /* Add DEFAULT if needed */
3999  if (import_default && attdefault != NULL)
4000  appendStringInfo(&buf, " DEFAULT %s", attdefault);
4001 
4002  /* Add NOT NULL if needed */
4003  if (import_not_null && attnotnull[0] == 't')
4004  appendStringInfoString(&buf, " NOT NULL");
4005  }
4006  while (++i < numrows &&
4007  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
4008 
4009  /*
4010  * Add server name and table-level options. We specify remote
4011  * schema and table name as options (the latter to ensure that
4012  * renaming the foreign table doesn't break the association).
4013  */
4014  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
4015  quote_identifier(server->servername));
4016 
4017  appendStringInfoString(&buf, "schema_name ");
4018  deparseStringLiteral(&buf, stmt->remote_schema);
4019  appendStringInfoString(&buf, ", table_name ");
4020  deparseStringLiteral(&buf, tablename);
4021 
4022  appendStringInfoString(&buf, ");");
4023 
4024  commands = lappend(commands, pstrdup(buf.data));
4025  }
4026 
4027  /* Clean up */
4028  PQclear(res);
4029  res = NULL;
4030  }
4031  PG_CATCH();
4032  {
4033  if (res)
4034  PQclear(res);
4035  PG_RE_THROW();
4036  }
4037  PG_END_TRY();
4038 
4039  ReleaseConnection(conn);
4040 
4041  return commands;
4042 }
4043 
4044 /*
4045  * Assess whether the join between inner and outer relations can be pushed down
4046  * to the foreign server. As a side effect, save information we obtain in this
4047  * function to PgFdwRelationInfo passed in.
4048  */
4049 static bool
4051  RelOptInfo *outerrel, RelOptInfo *innerrel,
4052  JoinPathExtraData *extra)
4053 {
4054  PgFdwRelationInfo *fpinfo;
4055  PgFdwRelationInfo *fpinfo_o;
4056  PgFdwRelationInfo *fpinfo_i;
4057  ListCell *lc;
4058  List *joinclauses;
4059  List *otherclauses;
4060 
4061  /*
4062  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4063  * Constructing queries representing SEMI and ANTI joins is hard, hence
4064  * not considered right now.
4065  */
4066  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4067  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4068  return false;
4069 
4070  /*
4071  * If either of the joining relations is marked as unsafe to pushdown, the
4072  * join can not be pushed down.
4073  */
4074  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4075  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4076  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4077  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4078  !fpinfo_i || !fpinfo_i->pushdown_safe)
4079  return false;
4080 
4081  /*
4082  * If joining relations have local conditions, those conditions are
4083  * required to be applied before joining the relations. Hence the join can
4084  * not be pushed down.
4085  */
4086  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4087  return false;
4088 
4089  /* Separate restrict list into join quals and quals on join relation */
4090  if (IS_OUTER_JOIN(jointype))
4091  extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses);
4092  else
4093  {
4094  /*
4095  * Unlike an outer join, for inner join, the join result contains only
4096  * the rows which satisfy join clauses, similar to the other clause.
4097  * Hence all clauses can be treated as other quals. This helps to push
4098  * a join down to the foreign server even if some of its join quals
4099  * are not safe to pushdown.
4100  */
4101  otherclauses = extract_actual_clauses(extra->restrictlist, false);
4102  joinclauses = NIL;
4103  }
4104 
4105  /* Join quals must be safe to push down. */
4106  foreach(lc, joinclauses)
4107  {
4108  Expr *expr = (Expr *) lfirst(lc);
4109 
4110  if (!is_foreign_expr(root, joinrel, expr))
4111  return false;
4112  }
4113 
4114  /*
4115  * deparseExplicitTargetList() isn't smart enough to handle anything other
4116  * than a Var. In particular, if there's some PlaceHolderVar that would
4117  * need to be evaluated within this join tree (because there's an upper
4118  * reference to a quantity that may go to NULL as a result of an outer
4119  * join), then we can't try to push the join down because we'll fail when
4120  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4121  * needs to be evaluated *at the top* of this join tree is OK, because we
4122  * can do that locally after fetching the results from the remote side.
4123  */
4124  foreach(lc, root->placeholder_list)
4125  {
4126  PlaceHolderInfo *phinfo = lfirst(lc);
4127  Relids relids = joinrel->relids;
4128 
4129  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4130  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4131  return false;
4132  }
4133 
4134  /* Save the join clauses, for later use. */
4135  fpinfo->joinclauses = joinclauses;
4136 
4137  /*
4138  * Other clauses are applied after the join has been performed and thus
4139  * need not be all pushable. We will push those which can be pushed to
4140  * reduce the number of rows fetched from the foreign server. Rest of them
4141  * will be applied locally after fetching join result. Add them to fpinfo
4142  * so that other joins involving this joinrel will know that this joinrel
4143  * has local clauses.
4144  */
4145  foreach(lc, otherclauses)
4146  {
4147  Expr *expr = (Expr *) lfirst(lc);
4148 
4149  if (!is_foreign_expr(root, joinrel, expr))
4150  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4151  else
4152  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4153  }
4154 
4155  fpinfo->outerrel = outerrel;
4156  fpinfo->innerrel = innerrel;
4157  fpinfo->jointype = jointype;
4158 
4159  /*
4160  * By default, both the input relations are not required to be deparsed
4161  * as subqueries, but there might be some relations covered by the input
4162  * relations that are required to be deparsed as subqueries, so save the
4163  * relids of those relations for later use by the deparser.
4164  */
4165  fpinfo->make_outerrel_subquery = false;
4166  fpinfo->make_innerrel_subquery = false;
4167  Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
4168  Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
4170  fpinfo_i->lower_subquery_rels);
4171 
4172  /*
4173  * Pull the other remote conditions from the joining relations into join
4174  * clauses or other remote clauses (remote_conds) of this relation
4175  * wherever possible. This avoids building subqueries at every join step.
4176  *
4177  * For an inner join, clauses from both the relations are added to the
4178  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4179  * the outer side are added to remote_conds since those can be evaluated
4180  * after the join is evaluated. The clauses from inner side are added to
4181  * the joinclauses, since they need to be evaluated while constructing the
4182  * join.
4183  *
4184  * For a FULL OUTER JOIN, the other clauses from either relation can not
4185  * be added to the joinclauses or remote_conds, since each relation acts
4186  * as an outer relation for the other.
4187  *
4188  * The joining sides can not have local conditions, thus no need to test
4189  * shippability of the clauses being pulled up.
4190  */
4191  switch (jointype)
4192  {
4193  case JOIN_INNER:
4194  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4195  list_copy(fpinfo_i->remote_conds));
4196  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4197  list_copy(fpinfo_o->remote_conds));
4198  break;
4199 
4200  case JOIN_LEFT:
4201  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4202  list_copy(fpinfo_i->remote_conds));
4203  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4204  list_copy(fpinfo_o->remote_conds));
4205  break;
4206 
4207  case JOIN_RIGHT:
4208  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4209  list_copy(fpinfo_o->remote_conds));
4210  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4211  list_copy(fpinfo_i->remote_conds));
4212  break;
4213 
4214  case JOIN_FULL:
4215 
4216  /*
4217  * In this case, if any of the input relations has conditions,
4218  * we need to deparse that relation as a subquery so that the
4219  * conditions can be evaluated before the join. Remember it in
4220  * the fpinfo of this relation so that the deparser can take
4221  * appropriate action. Also, save the relids of base relations
4222  * covered by that relation for later use by the deparser.
4223  */
4224  if (fpinfo_o->remote_conds)
4225  {
4226  fpinfo->make_outerrel_subquery = true;
4227  fpinfo->lower_subquery_rels =
4229  outerrel->relids);
4230  }
4231  if (fpinfo_i->remote_conds)
4232  {
4233  fpinfo->make_innerrel_subquery = true;
4234  fpinfo->lower_subquery_rels =
4236  innerrel->relids);
4237  }
4238  break;
4239 
4240  default:
4241  /* Should not happen, we have just check this above */
4242  elog(ERROR, "unsupported join type %d", jointype);
4243  }
4244 
4245  /*
4246  * For an inner join, all restrictions can be treated alike. Treating the
4247  * pushed down conditions as join conditions allows a top level full outer
4248  * join to be deparsed without requiring subqueries.
4249  */
4250  if (jointype == JOIN_INNER)
4251  {
4252  Assert(!fpinfo->joinclauses);
4253  fpinfo->joinclauses = fpinfo->remote_conds;
4254  fpinfo->remote_conds = NIL;
4255  }
4256 
4257  /* Mark that this join can be pushed down safely */
4258  fpinfo->pushdown_safe = true;
4259 
4260  /*
4261  * If user is willing to estimate cost for a scan of either of the joining
4262  * relations using EXPLAIN, he intends to estimate scans on that relation
4263  * more accurately. Then, it makes sense to estimate the cost of the join
4264  * with that relation more accurately using EXPLAIN.
4265  */
4266  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4267  fpinfo_i->use_remote_estimate;
4268 
4269  /* Get user mapping */
4270  if (fpinfo->use_remote_estimate)
4271  {
4272  if (fpinfo_o->use_remote_estimate)
4273  fpinfo->user = fpinfo_o->user;
4274  else
4275  fpinfo->user = fpinfo_i->user;
4276  }
4277  else
4278  fpinfo->user = NULL;
4279 
4280  /* Get foreign server */
4281  fpinfo->server = fpinfo_o->server;
4282 
4283  /*
4284  * Since both the joining relations come from the same server, the server
4285  * level options should have same value for both the relations. Pick from
4286  * any side.
4287  */
4288  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4289  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4290 
4291  /*
4292  * Set cached relation costs to some negative value, so that we can detect
4293  * when they are set to some sensible costs, during one (usually the
4294  * first) of the calls to estimate_path_cost_size().
4295  */
4296  fpinfo->rel_startup_cost = -1;
4297  fpinfo->rel_total_cost = -1;
4298 
4299  /*
4300  * Set fetch size to maximum of the joining sides, since we are expecting
4301  * the rows returned by the join to be proportional to the relation sizes.
4302  */
4303  if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
4304  fpinfo->fetch_size = fpinfo_o->fetch_size;
4305  else
4306  fpinfo->fetch_size = fpinfo_i->fetch_size;
4307 
4308  /*
4309  * Set the string describing this join relation to be used in EXPLAIN
4310  * output of corresponding ForeignScan.
4311  */
4312  fpinfo->relation_name = makeStringInfo();
4313  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4314  fpinfo_o->relation_name->data,
4315  get_jointype_name(fpinfo->jointype),
4316  fpinfo_i->relation_name->data);
4317 
4318  /*
4319  * Set the relation index. This is defined as the position of this
4320  * joinrel in the join_rel_list list plus the length of the rtable list.
4321  * Note that since this joinrel is at the end of the join_rel_list list
4322  * when we are called, we can get the position by list_length.
4323  */
4324  Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
4325  fpinfo->relation_index =
4327 
4328  return true;
4329 }
4330 
4331 static void
4333  Path *epq_path)
4334 {
4335  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4336  ListCell *lc;
4337 
4338  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4339 
4340  /* Create one path for each set of pathkeys we found above. */
4341  foreach(lc, useful_pathkeys_list)
4342  {
4343  double rows;
4344  int width;
4345  Cost startup_cost;
4346  Cost total_cost;
4347  List *useful_pathkeys = lfirst(lc);
4348 
4349  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4350  &rows, &width, &startup_cost, &total_cost);
4351 
4352  add_path(rel, (Path *)
4353  create_foreignscan_path(root, rel,
4354  NULL,
4355  rows,
4356  startup_cost,
4357  total_cost,
4358  useful_pathkeys,
4359  NULL,
4360  epq_path,
4361  NIL));
4362  }
4363 }
4364 
4365 /*
4366  * postgresGetForeignJoinPaths
4367  * Add possible ForeignPath to joinrel, if join is safe to push down.
4368  */
4369 static void
4371  RelOptInfo *joinrel,
4372  RelOptInfo *outerrel,
4373  RelOptInfo *innerrel,
4374  JoinType jointype,
4375  JoinPathExtraData *extra)
4376 {
4377  PgFdwRelationInfo *fpinfo;
4378  ForeignPath *joinpath;
4379  double rows;
4380  int width;
4381  Cost startup_cost;
4382  Cost total_cost;
4383  Path *epq_path; /* Path to create plan to be executed when
4384  * EvalPlanQual gets triggered. */
4385 
4386  /*
4387  * Skip if this join combination has been considered already.
4388  */
4389  if (joinrel->fdw_private)
4390  return;
4391 
4392  /*
4393  * Create unfinished PgFdwRelationInfo entry which is used to indicate
4394  * that the join relation is already considered, so that we won't waste
4395  * time in judging safety of join pushdown and adding the same paths again
4396  * if found safe. Once we know that this join can be pushed down, we fill
4397  * the entry.
4398  */
4399  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4400  fpinfo->pushdown_safe = false;
4401  joinrel->fdw_private = fpinfo;
4402  /* attrs_used is only for base relations. */
4403  fpinfo->attrs_used = NULL;
4404 
4405  /*
4406  * If there is a possibility that EvalPlanQual will be executed, we need
4407  * to be able to reconstruct the row using scans of the base relations.
4408  * GetExistingLocalJoinPath will find a suitable path for this purpose in
4409  * the path list of the joinrel, if one exists. We must be careful to
4410  * call it before adding any ForeignPath, since the ForeignPath might
4411  * dominate the only suitable local path available. We also do it before
4412  * reconstruct the row for EvalPlanQual(). Find an alternative local path
4413  * calling foreign_join_ok(), since that function updates fpinfo and marks
4414  * it as pushable if the join is found to be pushable.
4415  */
4416  if (root->parse->commandType == CMD_DELETE ||
4417  root->parse->commandType == CMD_UPDATE ||
4418  root->rowMarks)
4419  {
4420  epq_path = GetExistingLocalJoinPath(joinrel);
4421  if (!epq_path)
4422  {
4423  elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
4424  return;
4425  }
4426  }
4427  else
4428  epq_path = NULL;
4429 
4430  if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
4431  {
4432  /* Free path required for EPQ if we copied one; we don't need it now */
4433  if (epq_path)
4434  pfree(epq_path);
4435  return;
4436  }
4437 
4438  /*
4439  * Compute the selectivity and cost of the local_conds, so we don't have
4440  * to do it over again for each path. The best we can do for these
4441  * conditions is to estimate selectivity on the basis of local statistics.
4442  * The local conditions are applied after the join has been computed on
4443  * the remote side like quals in WHERE clause, so pass jointype as
4444  * JOIN_INNER.
4445  */
4446  fpinfo->local_conds_sel = clauselist_selectivity(root,
4447  fpinfo->local_conds,
4448  0,
4449  JOIN_INNER,
4450  NULL);
4451  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
4452 
4453  /*
4454  * If we are going to estimate costs locally, estimate the join clause
4455  * selectivity here while we have special join info.
4456  */
4457  if (!fpinfo->use_remote_estimate)
4458  fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
4459  0, fpinfo->jointype,
4460  extra->sjinfo);
4461 
4462  /* Estimate costs for bare join relation */
4463  estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
4464  &width, &startup_cost, &total_cost);
4465  /* Now update this information in the joinrel */
4466  joinrel->rows = rows;
4467  joinrel->reltarget->width = width;
4468  fpinfo->rows = rows;
4469  fpinfo->width = width;
4470  fpinfo->startup_cost = startup_cost;
4471  fpinfo->total_cost = total_cost;
4472 
4473  /*
4474  * Create a new join path and add it to the joinrel which represents a
4475  * join between foreign tables.
4476  */
4477  joinpath = create_foreignscan_path(root,
4478  joinrel,
4479  NULL, /* default pathtarget */
4480  rows,
4481  startup_cost,
4482  total_cost,
4483  NIL, /* no pathkeys */
4484  NULL, /* no required_outer */
4485  epq_path,
4486  NIL); /* no fdw_private */
4487 
4488  /* Add generated path into joinrel by add_path(). */
4489  add_path(joinrel, (Path *) joinpath);
4490 
4491  /* Consider pathkeys for the join relation */
4492  add_paths_with_pathkeys_for_rel(root, joinrel, epq_path);
4493 
4494  /* XXX Consider parameterized paths for the join relation */
4495 }
4496 
4497 /*
4498  * Assess whether the aggregation, grouping and having operations can be pushed
4499  * down to the foreign server. As a side effect, save information we obtain in
4500  * this function to PgFdwRelationInfo of the input relation.
4501  */
4502 static bool
4504 {
4505  Query *query = root->parse;
4506  PathTarget *grouping_target;
4507  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
4508  PgFdwRelationInfo *ofpinfo;
4509  List *aggvars;
4510  ListCell *lc;
4511  int i;
4512  List *tlist = NIL;
4513 
4514  /* Grouping Sets are not pushable */
4515  if (query->groupingSets)
4516  return false;
4517 
4518  /* Get the fpinfo of the underlying scan relation. */
4519  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
4520 
4521  /*
4522  * If underneath input relation has any local conditions, those conditions
4523  * are required to be applied before performing aggregation. Hence the
4524  * aggregate cannot be pushed down.
4525  */
4526  if (ofpinfo->local_conds)
4527  return false;
4528 
4529  /*
4530  * The targetlist expected from this node and the targetlist pushed down
4531  * to the foreign server may be different. The latter requires
4532  * sortgrouprefs to be set to push down GROUP BY clause, but should not
4533  * have those arising from ORDER BY clause. These sortgrouprefs may be
4534  * different from those in the plan's targetlist. Use a copy of path
4535  * target to record the new sortgrouprefs.
4536  */
4537  grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]);
4538 
4539  /*
4540  * Evaluate grouping targets and check whether they are safe to push down
4541  * to the foreign side. All GROUP BY expressions will be part of the
4542  * grouping target and thus there is no need to evaluate it separately.
4543  * While doing so, add required expressions into target list which can
4544  * then be used to pass to foreign server.
4545  */
4546  i = 0;
4547  foreach(lc, grouping_target->exprs)
4548  {
4549  Expr *expr = (Expr *) lfirst(lc);
4550  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
4551  ListCell *l;
4552 
4553  /* Check whether this expression is part of GROUP BY clause */
4554  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
4555  {
4556  /*
4557  * If any of the GROUP BY expression is not shippable we can not
4558  * push down aggregation to the foreign server.
4559  */
4560  if (!is_foreign_expr(root, grouped_rel, expr))
4561  return false;
4562 
4563  /* Pushable, add to tlist */
4564  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4565  }
4566  else
4567  {
4568  /* Check entire expression whether it is pushable or not */
4569  if (is_foreign_expr(root, grouped_rel, expr))
4570  {
4571  /* Pushable, add to tlist */
4572  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4573  }
4574  else
4575  {
4576  /*
4577  * If we have sortgroupref set, then it means that we have an
4578  * ORDER BY entry pointing to this expression. Since we are
4579  * not pushing ORDER BY with GROUP BY, clear it.
4580  */
4581  if (sgref)
4582  grouping_target->sortgrouprefs[i] = 0;
4583 
4584  /* Not matched exactly, pull the var with aggregates then */
4585  aggvars = pull_var_clause((Node *) expr,
4587 
4588  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
4589  return false;
4590 
4591  /*
4592  * Add aggregates, if any, into the targetlist. Plain var
4593  * nodes should be either same as some GROUP BY expression or
4594  * part of some GROUP BY expression. In later case, the query
4595  * cannot refer plain var nodes without the surrounding
4596  * expression. In both the cases, they are already part of
4597  * the targetlist and thus no need to add them again. In fact
4598  * adding pulled plain var nodes in SELECT clause will cause
4599  * an error on the foreign server if they are not same as some
4600  * GROUP BY expression.
4601  */
4602  foreach(l, aggvars)
4603  {
4604  Expr *expr = (Expr *) lfirst(l);
4605 
4606  if (IsA(expr, Aggref))
4607  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4608  }
4609  }
4610  }
4611 
4612  i++;
4613  }
4614 
4615  /*
4616  * Classify the pushable and non-pushable having clauses and save them in
4617  * remote_conds and local_conds of the grouped rel's fpinfo.
4618  */
4619  if (root->hasHavingQual && query->havingQual)
4620  {
4621  ListCell *lc;
4622 
4623  foreach(lc, (List *) query->havingQual)
4624  {
4625  Expr *expr = (Expr *) lfirst(lc);
4626 
4627  if (!is_foreign_expr(root, grouped_rel, expr))
4628  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4629  else
4630  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4631  }
4632  }
4633 
4634  /*
4635  * If there are any local conditions, pull Vars and aggregates from it and
4636  * check whether they are safe to pushdown or not.
4637  */
4638  if (fpinfo->local_conds)
4639  {
4640  ListCell *lc;
4641  List *aggvars = pull_var_clause((Node *) fpinfo->local_conds,
4643 
4644  foreach(lc, aggvars)
4645  {
4646  Expr *expr = (Expr *) lfirst(lc);
4647 
4648  /*
4649  * If aggregates within local conditions are not safe to push
4650  * down, then we cannot push down the query. Vars are already
4651  * part of GROUP BY clause which are checked above, so no need to
4652  * access them again here.
4653  */
4654  if (IsA(expr, Aggref))
4655  {
4656  if (!is_foreign_expr(root, grouped_rel, expr))
4657  return false;
4658 
4659  tlist = add_to_flat_tlist(tlist, aggvars);
4660  }
4661  }
4662  }
4663 
4664  /* Transfer any sortgroupref data to the replacement tlist */
4665  apply_pathtarget_labeling_to_tlist(tlist, grouping_target);
4666 
4667  /* Store generated targetlist */
4668  fpinfo->grouped_tlist = tlist;
4669 
4670  /* Safe to pushdown */
4671  fpinfo->pushdown_safe = true;
4672 
4673  /*
4674  * If user is willing to estimate cost for a scan using EXPLAIN, he
4675  * intends to estimate scans on that relation more accurately. Then, it
4676  * makes sense to estimate the cost of the grouping on that relation more
4677  * accurately using EXPLAIN.
4678  */
4679  fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate;
4680 
4681  /* Copy startup and tuple cost as is from underneath input rel's fpinfo */
4682  fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost;
4683  fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost;
4684 
4685  /*
4686  * Set cached relation costs to some negative value, so that we can detect
4687  * when they are set to some sensible costs, during one (usually the
4688  * first) of the calls to estimate_path_cost_size().
4689  */
4690  fpinfo->rel_startup_cost = -1;
4691  fpinfo->rel_total_cost = -1;
4692 
4693  /* Set fetch size same as that of underneath input rel's fpinfo */
4694  fpinfo->fetch_size = ofpinfo->fetch_size;
4695 
4696  /*
4697  * Set the string describing this grouped relation to be used in EXPLAIN
4698  * output of corresponding ForeignScan.
4699  */
4700  fpinfo->relation_name = makeStringInfo();
4701  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
4702  ofpinfo->relation_name->data);
4703 
4704  return true;
4705 }
4706 
4707 /*
4708  * postgresGetForeignUpperPaths
4709  * Add paths for post-join operations like aggregation, grouping etc. if
4710  * corresponding operations are safe to push down.
4711  *
4712  * Right now, we only support aggregate, grouping and having clause pushdown.
4713  */
4714 static void
4716  RelOptInfo *input_rel, RelOptInfo *output_rel)
4717 {
4718  PgFdwRelationInfo *fpinfo;
4719 
4720  /*
4721  * If input rel is not safe to pushdown, then simply return as we cannot
4722  * perform any post-join operations on the foreign server.
4723  */
4724  if (!input_rel->fdw_private ||
4725  !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
4726  return;
4727 
4728  /* Ignore stages we don't support; and skip any duplicate calls. */
4729  if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
4730  return;
4731 
4732  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
4733  fpinfo->pushdown_safe = false;
4734  output_rel->fdw_private = fpinfo;
4735 
4736  add_foreign_grouping_paths(root, input_rel, output_rel);
4737 }
4738 
4739 /*
4740  * add_foreign_grouping_paths
4741  * Add foreign path for grouping and/or aggregation.
4742  *
4743  * Given input_rel represents the underlying scan. The paths are added to the
4744  * given grouped_rel.
4745  */
4746 static void
4748  RelOptInfo *grouped_rel)
4749 {
4750  Query *parse = root->parse;
4751  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
4752  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
4753  ForeignPath *grouppath;
4754  PathTarget *grouping_target;
4755  double rows;
4756  int width;
4757  Cost startup_cost;
4758  Cost total_cost;
4759 
4760  /* Nothing to be done, if there is no grouping or aggregation required. */
4761  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
4762  !root->hasHavingQual)
4763  return;
4764 
4765  grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4766 
4767  /* save the input_rel as outerrel in fpinfo */
4768  fpinfo->outerrel = input_rel;
4769 
4770  /*
4771  * Copy foreign table, foreign server, user mapping, shippable extensions
4772  * etc. details from the input relation's fpinfo.
4773  */
4774  fpinfo->table = ifpinfo->table;
4775  fpinfo->server = ifpinfo->server;
4776  fpinfo->user = ifpinfo->user;
4777  fpinfo->shippable_extensions = ifpinfo->shippable_extensions;
4778 
4779  /* Assess if it is safe to push down aggregation and grouping. */
4780  if (!foreign_grouping_ok(root, grouped_rel))
4781  return;
4782 
4783  /* Estimate the cost of push down */
4784  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
4785  &width, &startup_cost, &total_cost);
4786 
4787  /* Now update this information in the fpinfo */
4788  fpinfo->rows = rows;
4789  fpinfo->width = width;
4790  fpinfo->startup_cost = startup_cost;
4791  fpinfo->total_cost = total_cost;
4792 
4793  /* Create and add foreign path to the grouping relation. */
4794  grouppath = create_foreignscan_path(root,
4795  grouped_rel,
4796  grouping_target,
4797  rows,
4798  startup_cost,
4799  total_cost,
4800  NIL, /* no pathkeys */
4801  NULL, /* no required_outer */
4802  NULL,
4803  NIL); /* no fdw_private */
4804 
4805  /* Add generated path into grouped_rel by add_path(). */
4806  add_path(grouped_rel, (Path *) grouppath);
4807 }
4808 
4809 /*
4810  * Create a tuple from the specified row of the PGresult.
4811  *
4812  * rel is the local representation of the foreign table, attinmeta is
4813  * conversion data for the rel's tupdesc, and retrieved_attrs is an
4814  * integer list of the table column numbers present in the PGresult.
4815  * temp_context is a working context that can be reset after each tuple.
4816  */
4817 static HeapTuple
4819  int row,
4820  Relation rel,
4821  AttInMetadata *attinmeta,
4822  List *retrieved_attrs,
4823  ForeignScanState *fsstate,
4824  MemoryContext temp_context)
4825 {
4826  HeapTuple tuple;
4827  TupleDesc tupdesc;
4828  Datum *values;
4829  bool *nulls;
4830  ItemPointer ctid = NULL;
4831  Oid oid = InvalidOid;
4832  ConversionLocation errpos;
4833  ErrorContextCallback errcallback;
4834  MemoryContext oldcontext;
4835  ListCell *lc;
4836  int j;
4837 
4838  Assert(row < PQntuples(res));
4839 
4840  /*
4841  * Do the following work in a temp context that we reset after each tuple.
4842  * This cleans up not only the data we have direct access to, but any
4843  * cruft the I/O functions might leak.
4844  */
4845  oldcontext = MemoryContextSwitchTo(temp_context);
4846 
4847  if (rel)
4848  tupdesc = RelationGetDescr(rel);
4849  else
4850  {
4851  PgFdwScanState *fdw_sstate;
4852 
4853  Assert(fsstate);
4854  fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4855  tupdesc = fdw_sstate->tupdesc;
4856  }
4857 
4858  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4859  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4860  /* Initialize to nulls for any columns not present in result */
4861  memset(nulls, true, tupdesc->natts * sizeof(bool));
4862 
4863  /*
4864  * Set up and install callback to report where conversion error occurs.
4865  */
4866  errpos.rel = rel;
4867  errpos.cur_attno = 0;
4868  errpos.fsstate = fsstate;
4869  errcallback.callback = conversion_error_callback;
4870  errcallback.arg = (void *) &errpos;
4871  errcallback.previous = error_context_stack;
4872  error_context_stack = &errcallback;
4873 
4874  /*
4875  * i indexes columns in the relation, j indexes columns in the PGresult.
4876  */
4877  j = 0;
4878  foreach(lc, retrieved_attrs)
4879  {
4880  int i = lfirst_int(lc);
4881  char *valstr;
4882 
4883  /* fetch next column's textual value */
4884  if (PQgetisnull(res, row, j))
4885  valstr = NULL;
4886  else
4887  valstr = PQgetvalue(res, row, j);
4888 
4889  /*
4890  * convert value to internal representation
4891  *
4892  * Note: we ignore system columns other than ctid and oid in result
4893  */
4894  errpos.cur_attno = i;
4895  if (i > 0)
4896  {
4897  /* ordinary column */
4898  Assert(i <= tupdesc->natts);
4899  nulls[i - 1] = (valstr == NULL);
4900  /* Apply the input function even to nulls, to support domains */
4901  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4902  valstr,
4903  attinmeta->attioparams[i - 1],
4904  attinmeta->atttypmods[i - 1]);
4905  }
4906  else if (i == SelfItemPointerAttributeNumber)
4907  {
4908  /* ctid */
4909  if (valstr != NULL)
4910  {
4911  Datum datum;
4912 
4913  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
4914  ctid = (ItemPointer) DatumGetPointer(datum);
4915  }
4916  }
4917  else if (i == ObjectIdAttributeNumber)
4918  {
4919  /* oid */
4920  if (valstr != NULL)
4921  {
4922  Datum datum;
4923 
4924  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
4925  oid = DatumGetObjectId(datum);
4926  }
4927  }
4928  errpos.cur_attno = 0;
4929 
4930  j++;
4931  }
4932 
4933  /* Uninstall error context callback. */
4934  error_context_stack = errcallback.previous;
4935 
4936  /*
4937  * Check we got the expected number of columns. Note: j == 0 and
4938  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
4939  */
4940  if (j > 0 && j != PQnfields(res))
4941  elog(ERROR, "remote query result does not match the foreign table");
4942 
4943  /*
4944  * Build the result tuple in caller's memory context.
4945  */
4946  MemoryContextSwitchTo(oldcontext);
4947 
4948  tuple = heap_form_tuple(tupdesc, values, nulls);
4949 
4950  /*
4951  * If we have a CTID to return, install it in both t_self and t_ctid.
4952  * t_self is the normal place, but if the tuple is converted to a
4953  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
4954  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
4955  */
4956  if (ctid)
4957  tuple->t_self = tuple->t_data->t_ctid = *ctid;
4958 
4959  /*
4960  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
4961  * heap_form_tuple. heap_form_tuple actually creates the tuple with
4962  * DatumTupleFields, not HeapTupleFields, but the executor expects
4963  * HeapTupleFields and will happily extract system columns on that
4964  * assumption. If we don't do this then, for example, the tuple length
4965  * ends up in the xmin field, which isn't what we want.
4966  */
4970 
4971  /*
4972  * If we have an OID to return, install it.
4973  */
4974  if (OidIsValid(oid))
4975  HeapTupleSetOid(tuple, oid);
4976 
4977  /* Clean up */
4978  MemoryContextReset(temp_context);
4979 
4980  return tuple;
4981 }
4982 
4983 /*
4984  * Callback function which is called when error occurs during column value
4985  * conversion. Print names of column and relation.
4986  */
4987 static void
4989 {
4990  const char *attname = NULL;
4991  const char *relname = NULL;
4992  bool is_wholerow = false;
4993  ConversionLocation *errpos = (ConversionLocation *) arg;
4994 
4995  if (errpos->rel)
4996  {
4997  /* error occurred in a scan against a foreign table */
4998  TupleDesc tupdesc = RelationGetDescr(errpos->rel);
4999 
5000  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
5001  attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname);
5002  else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
5003  attname = "ctid";
5004  else if (errpos->cur_attno == ObjectIdAttributeNumber)
5005  attname = "oid";
5006 
5007  relname = RelationGetRelationName(errpos->rel);
5008  }
5009  else
5010  {
5011  /* error occurred in a scan against a foreign join */
5012  ForeignScanState *fsstate = errpos->fsstate;
5013  ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
5014  EState *estate = fsstate->ss.ps.state;
5015  TargetEntry *tle;
5016 
5018  errpos->cur_attno - 1));
5019 
5020  /*
5021  * Target list can have Vars and expressions. For Vars, we can get
5022  * it's relation, however for expressions we can't. Thus for
5023  * expressions, just show generic context message.
5024  */
5025  if (IsA(tle->expr, Var))
5026  {
5027  RangeTblEntry *rte;
5028  Var *var = (Var *) tle->expr;
5029 
5030  rte = rt_fetch(var->varno, estate->es_range_table);
5031 
5032  if (var->varattno == 0)
5033  is_wholerow = true;
5034  else
5035  attname = get_relid_attribute_name(rte->relid, var->varattno);
5036 
5037  relname = get_rel_name(rte->relid);
5038  }
5039  else
5040  errcontext("processing expression at position %d in select list",
5041  errpos->cur_attno);
5042  }
5043 
5044  if (relname)
5045  {
5046  if (is_wholerow)
5047  errcontext("whole-row reference to foreign table \"%s\"", relname);
5048  else if (attname)
5049  errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
5050  }
5051 }
5052 
5053 /*
5054  * Find an equivalence class member expression, all of whose Vars, come from
5055  * the indicated relation.
5056  */
5057 extern Expr *
5059 {
5060  ListCell *lc_em;
5061 
5062  foreach(lc_em, ec->ec_members)
5063  {
5064  EquivalenceMember *em = lfirst(lc_em);
5065 
5066  if (bms_is_subset(em->em_relids, rel->relids))
5067  {
5068  /*
5069  * If there is more than one equivalence member whose Vars are
5070  * taken entirely from this relation, we'll be content to choose
5071  * any one of those.
5072  */
5073  return em->em_expr;
5074  }
5075  }
5076 
5077  /* We didn't find any suitable equivalence class expression */
5078  return NULL;
5079 }
GetForeignPlan_function GetForeignPlan
Definition: fdwapi.h:176
bool has_eclass_joins
Definition: relation.h:556
Value * makeString(char *str)
Definition: value.c:53
BeginForeignScan_function BeginForeignScan
Definition: fdwapi.h:177
GetForeignUpperPaths_function GetForeignUpperPaths
Definition: fdwapi.h:191
ExecForeignDelete_function ExecForeignDelete
Definition: fdwapi.h:199
#define PG_RETURN_POINTER(x)
Definition: fmgr.h:321
RelOptInfo * find_childrel_top_parent(PlannerInfo *root, RelOptInfo *rel)
Definition: relnode.c:976
EndDirectModify_function EndDirectModify
Definition: fdwapi.h:205
#define NIL
Definition: pg_list.h:69
PathTarget * copy_pathtarget(PathTarget *src)
Definition: tlist.c:629
List * rowMarks
Definition: relation.h:255
ScanState ss
Definition: execnodes.h:1470
void deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1616
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
Datum postgres_fdw_handler(PG_FUNCTION_ARGS)
Definition: postgres_fdw.c:426
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:795
Definition: fmgr.h:56
List * qual
Definition: plannodes.h:133
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
double estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows, List **pgset)
Definition: selfuncs.c:3277
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
FdwModifyPrivateIndex
Definition: postgres_fdw.c:91
Relation ri_RelationDesc
Definition: execnodes.h:373
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:392
#define IsA(nodeptr, _type_)
Definition: nodes.h:557
static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
Query * parse
Definition: relation.h:154
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2600
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:412
Relids ph_eval_at
Definition: relation.h:2016
static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
ParamPathInfo * get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, Relids required_outer)
Definition: relnode.c:1035
ExplainForeignScan_function ExplainForeignScan
Definition: fdwapi.h:213
RelOptKind reloptkind
Definition: relation.h:491
Index scanrelid
Definition: plannodes.h:316
List * query_pathkeys
Definition: relation.h:261
Instrumentation * instrument
Definition: execnodes.h:806
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1183
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10185
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path)
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
void extract_actual_join_clauses(List *restrictinfo_list, List **joinquals, List **otherquals)
Definition: restrictinfo.c:381
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:512
HeapTuple * rows
Definition: postgres_fdw.c:230
AttrNumber ExecFindJunkAttributeInTlist(List *targetlist, const char *attrName)
Definition: execJunk.c:221
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2946
#define RelationGetDescr(relation)
Definition: rel.h:429
#define DEBUG3
Definition: elog.h:23
Oid GetUserId(void)
Definition: miscinit.c:283
#define ObjectIdAttributeNumber
Definition: sysattr.h:22
AnalyzeForeignTable_function AnalyzeForeignTable
Definition: fdwapi.h:218
#define castNode(_type_, nodeptr)
Definition: nodes.h:575
static void create_cursor(ForeignScanState *node)
static void add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel)
const char ** param_values
Definition: postgres_fdw.c:209
#define PointerGetDatum(X)
Definition: postgres.h:562
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3014
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
List * param_exprs
Definition: postgres_fdw.c:144
int(* AcquireSampleRowsFunc)(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
Definition: fdwapi.h:134
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:197
bool eclass_useful_for_merging(PlannerInfo *root, EquivalenceClass *eclass, RelOptInfo *rel)
Definition: equivclass.c:2393
#define DatumGetObjectId(X)
Definition: postgres.h:506
char * pstrdup(const char *in)
Definition: mcxt.c:1077
ExprContext * ps_ExprContext
Definition: execnodes.h:830
static void postgresBeginForeignScan(ForeignScanState *node, int eflags)
double tuples
Definition: relation.h:534
List * baserestrictinfo
Definition: relation.h:549
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:175
List * fdw_exprs
Definition: plannodes.h:577
void get_agg_clause_costs(PlannerInfo *root, Node *clause, AggSplit aggsplit, AggClauseCosts *costs)
Definition: clauses.c:466
PG_FUNCTION_INFO_V1(postgres_fdw_handler)
bool hasAggs
Definition: parsenodes.h:116
Relids clause_relids
Definition: relation.h:1714
AttInMetadata * attinmeta
Definition: postgres_fdw.c:196
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
#define Min(x, y)
Definition: c.h:806
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:937
ForeignServer * server
Definition: postgres_fdw.h:78
bool pseudoconstant
Definition: relation.h:1707
int resultRelation
Definition: parsenodes.h:113
List * fdw_private
Definition: plannodes.h:578
static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:702
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
Form_pg_attribute * attrs
Definition: tupdesc.h:74
#define RELKIND_MATVIEW
Definition: pg_class.h:165
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IS_OUTER_JOIN(jointype)
Definition: nodes.h:712
double sampler_random_fract(SamplerRandomState randstate)
Definition: sampling.c:238
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
List * groupingSets
Definition: parsenodes.h:139
uint64 fetch_size
Definition: logging.c:21
#define InvalidBuffer
Definition: buf.h:25
void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds)
Definition: deparse.c:200
int set_transmission_modes(void)
List * list_copy(const List *oldlist)
Definition: list.c:1160
Definition: nodes.h:506
#define strVal(v)
Definition: value.h:54
Relids lower_subquery_rels
Definition: postgres_fdw.h:104
int errcode(int sqlerrcode)
Definition: elog.c:575
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
int IntervalStyle
Definition: globals.c:108
#define MemSet(start, val, len)
Definition: c.h:857
AttrNumber varattno
Definition: primnodes.h:168
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
List * join_rel_list
Definition: relation.h:214
bool canSetTag
Definition: plannodes.h:206
CmdType operation
Definition: execnodes.h:907
List * list_concat(List *list1, List *list2)
Definition: list.c:321
return result
Definition: formatting.c:1618
int32 * atttypmods
Definition: funcapi.h:47
PathKey * make_canonical_pathkey(PlannerInfo *root, EquivalenceClass *eclass, Oid opfamily, int strategy, bool nulls_first)
Definition: pathkeys.c:51
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1047
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
uint32 BlockNumber
Definition: block.h:31
EquivalenceClass * right_ec
Definition: relation.h:1748
List * retrieved_attrs
Definition: postgres_fdw.c:136
void reservoir_init_selection_state(ReservoirState rs, int n)
Definition: sampling.c:129
List * pull_var_clause(Node *node, int flags)
Definition: var.c:535
List * fdw_scan_tlist
Definition: plannodes.h:579
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:584
double Selectivity
Definition: nodes.h:629
Relation ss_currentRelation
Definition: execnodes.h:1045
EState * state
Definition: execnodes.h:802
QualCost transCost
Definition: relation.h:62
List * es_range_table
Definition: execnodes.h:410
static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:483
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6001
static int postgresIsForeignRelUpdatable(Relation rel)
UpperRelationKind
Definition: relation.h:71
Definition: primnodes.h:163
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
struct ErrorContextCallback * previous
Definition: elog.h:238
#define OidIsValid(objectId)
Definition: c.h:538
Oid * attioparams
Definition: funcapi.h:44
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
List * retrieved_attrs
Definition: postgres_fdw.c:179
List * mergeopfamilies
Definition: relation.h:1744
static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
int natts
Definition: tupdesc.h:73
CmdType operation
Definition: plannodes.h:575
List * plans
Definition: plannodes.h:212
static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
static void close_cursor(PGconn *conn, unsigned int cursor_number)
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
Cost startup
Definition: relation.h:45
void pull_varattnos(Node *node, Index varno, Bitmapset **varattnos)
Definition: var.c:219
AddForeignUpdateTargets_function AddForeignUpdateTargets
Definition: fdwapi.h:194
Index ri_RangeTableIndex
Definition: execnodes.h:372
static void postgresEndDirectModify(ForeignScanState *node)
#define PVC_INCLUDE_AGGREGATES
Definition: var.h:20
#define USE_ISO_DATES
Definition: miscadmin.h:210
JoinType
Definition: nodes.h:663
List * targetList
Definition: parsenodes.h:131
void deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *returningList, List **retrieved_attrs)
Definition: deparse.c:1553
unsigned int cursor_number
Definition: postgres_fdw.c:140
struct RelOptInfo ** simple_rel_array
Definition: relation.h:178
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1976
ItemPointerData * ItemPointer
Definition: itemptr.h:48
HeapTupleHeader t_data
Definition: htup.h:67
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:698
ErrorContextCallback * error_context_stack
Definition: elog.c:88
void ReleaseConnection(PGconn *conn)
Definition: connection.c:402
RecheckForeignScan_function RecheckForeignScan
Definition: fdwapi.h:210
IterateDirectModify_function IterateDirectModify
Definition: fdwapi.h:204
#define list_make1(x1)
Definition: pg_list.h:133
#define NAMEDATALEN
PlanState ps
Definition: execnodes.h:1044
static struct @114 value
char * relname
Definition: primnodes.h:68
Value * makeInteger(long i)
Definition: value.c:23
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
Definition: explain.c:3122
JoinType jointype
Definition: plannodes.h:640
static void postgresEndForeignScan(ForeignScanState *node)
Relids lateral_relids
Definition: relation.h:519
bool defGetBoolean(DefElem *def)
Definition: define.c:111
Cost per_tuple
Definition: relation.h:46