PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <limits.h>
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_opfamily.h"
22 #include "commands/defrem.h"
23 #include "commands/explain.h"
24 #include "commands/vacuum.h"
25 #include "executor/execAsync.h"
26 #include "foreign/fdwapi.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "nodes/nodeFuncs.h"
31 #include "optimizer/appendinfo.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/cost.h"
34 #include "optimizer/inherit.h"
35 #include "optimizer/optimizer.h"
36 #include "optimizer/pathnode.h"
37 #include "optimizer/paths.h"
38 #include "optimizer/planmain.h"
39 #include "optimizer/prep.h"
40 #include "optimizer/restrictinfo.h"
41 #include "optimizer/tlist.h"
42 #include "parser/parsetree.h"
43 #include "postgres_fdw.h"
44 #include "storage/latch.h"
45 #include "utils/builtins.h"
46 #include "utils/float.h"
47 #include "utils/guc.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/sampling.h"
52 #include "utils/selfuncs.h"
53 
55 
56 /* Default CPU cost to start up a foreign query. */
57 #define DEFAULT_FDW_STARTUP_COST 100.0
58 
59 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
60 #define DEFAULT_FDW_TUPLE_COST 0.01
61 
62 /* If no remote estimates, assume a sort costs 20% extra */
63 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
64 
65 /*
66  * Indexes of FDW-private information stored in fdw_private lists.
67  *
68  * These items are indexed with the enum FdwScanPrivateIndex, so an item
69  * can be fetched with list_nth(). For example, to get the SELECT statement:
70  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
71  */
73 {
74  /* SQL statement to execute remotely (as a String node) */
76  /* Integer list of attribute numbers retrieved by the SELECT */
78  /* Integer representing the desired fetch_size */
80 
81  /*
82  * String describing join i.e. names of relations being joined and types
83  * of join, added when the scan is join
84  */
86 };
87 
88 /*
89  * Similarly, this enum describes what's kept in the fdw_private list for
90  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
91  *
92  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
93  * 2) Integer list of target attribute numbers for INSERT/UPDATE
94  * (NIL for a DELETE)
95  * 3) Length till the end of VALUES clause for INSERT
96  * (-1 for a DELETE/UPDATE)
97  * 4) Boolean flag showing if the remote query has a RETURNING clause
98  * 5) Integer list of attribute numbers retrieved by RETURNING, if any
99  */
101 {
102  /* SQL statement to execute remotely (as a String node) */
104  /* Integer list of target attribute numbers for INSERT/UPDATE */
106  /* Length till the end of VALUES clause (as an Integer node) */
108  /* has-returning flag (as a Boolean node) */
110  /* Integer list of attribute numbers retrieved by RETURNING */
112 };
113 
114 /*
115  * Similarly, this enum describes what's kept in the fdw_private list for
116  * a ForeignScan node that modifies a foreign table directly. We store:
117  *
118  * 1) UPDATE/DELETE statement text to be sent to the remote server
119  * 2) Boolean flag showing if the remote query has a RETURNING clause
120  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
121  * 4) Boolean flag showing if we set the command es_processed
122  */
124 {
125  /* SQL statement to execute remotely (as a String node) */
127  /* has-returning flag (as a Boolean node) */
129  /* Integer list of attribute numbers retrieved by RETURNING */
131  /* set-processed flag (as a Boolean node) */
133 };
134 
135 /*
136  * Execution state of a foreign scan using postgres_fdw.
137  */
138 typedef struct PgFdwScanState
139 {
140  Relation rel; /* relcache entry for the foreign table. NULL
141  * for a foreign join scan. */
142  TupleDesc tupdesc; /* tuple descriptor of scan */
143  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
144 
145  /* extracted fdw_private data */
146  char *query; /* text of SELECT command */
147  List *retrieved_attrs; /* list of retrieved attribute numbers */
148 
149  /* for remote query execution */
150  PGconn *conn; /* connection for the scan */
151  PgFdwConnState *conn_state; /* extra per-connection state */
152  unsigned int cursor_number; /* quasi-unique ID for my cursor */
153  bool cursor_exists; /* have we created the cursor? */
154  int numParams; /* number of parameters passed to query */
155  FmgrInfo *param_flinfo; /* output conversion functions for them */
156  List *param_exprs; /* executable expressions for param values */
157  const char **param_values; /* textual values of query parameters */
158 
159  /* for storing result tuples */
160  HeapTuple *tuples; /* array of currently-retrieved tuples */
161  int num_tuples; /* # of tuples in array */
162  int next_tuple; /* index of next one to return */
163 
164  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
165  int fetch_ct_2; /* Min(# of fetches done, 2) */
166  bool eof_reached; /* true if last fetch reached EOF */
167 
168  /* for asynchronous execution */
169  bool async_capable; /* engage asynchronous-capable logic? */
170 
171  /* working memory contexts */
172  MemoryContext batch_cxt; /* context holding current batch of tuples */
173  MemoryContext temp_cxt; /* context for per-tuple temporary data */
174 
175  int fetch_size; /* number of tuples per fetch */
177 
178 /*
179  * Execution state of a foreign insert/update/delete operation.
180  */
181 typedef struct PgFdwModifyState
182 {
183  Relation rel; /* relcache entry for the foreign table */
184  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
185 
186  /* for remote query execution */
187  PGconn *conn; /* connection for the scan */
188  PgFdwConnState *conn_state; /* extra per-connection state */
189  char *p_name; /* name of prepared statement, if created */
190 
191  /* extracted fdw_private data */
192  char *query; /* text of INSERT/UPDATE/DELETE command */
193  char *orig_query; /* original text of INSERT command */
194  List *target_attrs; /* list of target attribute numbers */
195  int values_end; /* length up to the end of VALUES */
196  int batch_size; /* value of FDW option "batch_size" */
197  bool has_returning; /* is there a RETURNING clause? */
198  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
199 
200  /* info about parameters for prepared statement */
201  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
202  int p_nums; /* number of parameters to transmit */
203  FmgrInfo *p_flinfo; /* output conversion functions for them */
204 
205  /* batch operation stuff */
206  int num_slots; /* number of slots to insert */
207 
208  /* working memory context */
209  MemoryContext temp_cxt; /* context for per-tuple temporary data */
210 
211  /* for update row movement if subplan result rel */
212  struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
213  * created */
215 
216 /*
217  * Execution state of a foreign scan that modifies a foreign table directly.
218  */
220 {
221  Relation rel; /* relcache entry for the foreign table */
222  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
223 
224  /* extracted fdw_private data */
225  char *query; /* text of UPDATE/DELETE command */
226  bool has_returning; /* is there a RETURNING clause? */
227  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
228  bool set_processed; /* do we set the command es_processed? */
229 
230  /* for remote query execution */
231  PGconn *conn; /* connection for the update */
232  PgFdwConnState *conn_state; /* extra per-connection state */
233  int numParams; /* number of parameters passed to query */
234  FmgrInfo *param_flinfo; /* output conversion functions for them */
235  List *param_exprs; /* executable expressions for param values */
236  const char **param_values; /* textual values of query parameters */
237 
238  /* for storing result tuples */
239  PGresult *result; /* result for query */
240  int num_tuples; /* # of result tuples */
241  int next_tuple; /* index of next one to return */
242  Relation resultRel; /* relcache entry for the target relation */
243  AttrNumber *attnoMap; /* array of attnums of input user columns */
244  AttrNumber ctidAttno; /* attnum of input ctid column */
245  AttrNumber oidAttno; /* attnum of input oid column */
246  bool hasSystemCols; /* are there system columns of resultRel? */
247 
248  /* working memory context */
249  MemoryContext temp_cxt; /* context for per-tuple temporary data */
251 
252 /*
253  * Workspace for analyzing a foreign table.
254  */
255 typedef struct PgFdwAnalyzeState
256 {
257  Relation rel; /* relcache entry for the foreign table */
258  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
259  List *retrieved_attrs; /* attr numbers retrieved by query */
260 
261  /* collected sample rows */
262  HeapTuple *rows; /* array of size targrows */
263  int targrows; /* target # of sample rows */
264  int numrows; /* # of sample rows collected */
265 
266  /* for random sampling */
267  double samplerows; /* # of rows fetched */
268  double rowstoskip; /* # of rows to skip before next sample */
269  ReservoirStateData rstate; /* state for reservoir sampling */
270 
271  /* working memory contexts */
272  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
273  MemoryContext temp_cxt; /* context for per-tuple temporary data */
275 
276 /*
277  * This enum describes what's kept in the fdw_private list for a ForeignPath.
278  * We store:
279  *
280  * 1) Boolean flag showing if the remote query has the final sort
281  * 2) Boolean flag showing if the remote query has the LIMIT clause
282  */
284 {
285  /* has-final-sort flag (as a Boolean node) */
287  /* has-limit flag (as a Boolean node) */
289 };
290 
291 /* Struct for extra information passed to estimate_path_cost_size() */
292 typedef struct
293 {
296  bool has_limit;
297  double limit_tuples;
298  int64 count_est;
299  int64 offset_est;
301 
302 /*
303  * Identify the attribute where data conversion fails.
304  */
305 typedef struct ConversionLocation
306 {
307  AttrNumber cur_attno; /* attribute number being processed, or 0 */
308  Relation rel; /* foreign table being processed, or NULL */
309  ForeignScanState *fsstate; /* plan node being processed, or NULL */
311 
312 /* Callback argument for ec_member_matches_foreign */
313 typedef struct
314 {
315  Expr *current; /* current expr, or NULL if not yet found */
316  List *already_used; /* expressions already dealt with */
318 
319 /*
320  * SQL functions
321  */
323 
324 /*
325  * FDW callback routines
326  */
327 static void postgresGetForeignRelSize(PlannerInfo *root,
328  RelOptInfo *baserel,
329  Oid foreigntableid);
330 static void postgresGetForeignPaths(PlannerInfo *root,
331  RelOptInfo *baserel,
332  Oid foreigntableid);
334  RelOptInfo *foreignrel,
335  Oid foreigntableid,
336  ForeignPath *best_path,
337  List *tlist,
338  List *scan_clauses,
339  Plan *outer_plan);
340 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
343 static void postgresEndForeignScan(ForeignScanState *node);
345  Index rtindex,
346  RangeTblEntry *target_rte,
347  Relation target_relation);
349  ModifyTable *plan,
350  Index resultRelation,
351  int subplan_index);
352 static void postgresBeginForeignModify(ModifyTableState *mtstate,
353  ResultRelInfo *resultRelInfo,
354  List *fdw_private,
355  int subplan_index,
356  int eflags);
358  ResultRelInfo *resultRelInfo,
359  TupleTableSlot *slot,
360  TupleTableSlot *planSlot);
362  ResultRelInfo *resultRelInfo,
363  TupleTableSlot **slots,
364  TupleTableSlot **planSlots,
365  int *numSlots);
366 static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
368  ResultRelInfo *resultRelInfo,
369  TupleTableSlot *slot,
370  TupleTableSlot *planSlot);
372  ResultRelInfo *resultRelInfo,
373  TupleTableSlot *slot,
374  TupleTableSlot *planSlot);
375 static void postgresEndForeignModify(EState *estate,
376  ResultRelInfo *resultRelInfo);
377 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
378  ResultRelInfo *resultRelInfo);
379 static void postgresEndForeignInsert(EState *estate,
380  ResultRelInfo *resultRelInfo);
382 static bool postgresPlanDirectModify(PlannerInfo *root,
383  ModifyTable *plan,
384  Index resultRelation,
385  int subplan_index);
386 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
388 static void postgresEndDirectModify(ForeignScanState *node);
390  ExplainState *es);
392  ResultRelInfo *rinfo,
393  List *fdw_private,
394  int subplan_index,
395  ExplainState *es);
397  ExplainState *es);
398 static void postgresExecForeignTruncate(List *rels,
399  DropBehavior behavior,
400  bool restart_seqs);
401 static bool postgresAnalyzeForeignTable(Relation relation,
402  AcquireSampleRowsFunc *func,
403  BlockNumber *totalpages);
405  Oid serverOid);
406 static void postgresGetForeignJoinPaths(PlannerInfo *root,
407  RelOptInfo *joinrel,
408  RelOptInfo *outerrel,
409  RelOptInfo *innerrel,
410  JoinType jointype,
411  JoinPathExtraData *extra);
413  TupleTableSlot *slot);
414 static void postgresGetForeignUpperPaths(PlannerInfo *root,
415  UpperRelationKind stage,
416  RelOptInfo *input_rel,
417  RelOptInfo *output_rel,
418  void *extra);
420 static void postgresForeignAsyncRequest(AsyncRequest *areq);
422 static void postgresForeignAsyncNotify(AsyncRequest *areq);
423 
424 /*
425  * Helper functions
426  */
427 static void estimate_path_cost_size(PlannerInfo *root,
428  RelOptInfo *foreignrel,
429  List *param_join_conds,
430  List *pathkeys,
431  PgFdwPathExtraData *fpextra,
432  double *p_rows, int *p_width,
433  Cost *p_startup_cost, Cost *p_total_cost);
434 static void get_remote_estimate(const char *sql,
435  PGconn *conn,
436  double *rows,
437  int *width,
438  Cost *startup_cost,
439  Cost *total_cost);
441  List *pathkeys,
442  double retrieved_rows,
443  double width,
444  double limit_tuples,
445  Cost *p_startup_cost,
446  Cost *p_run_cost);
447 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
449  void *arg);
450 static void create_cursor(ForeignScanState *node);
451 static void fetch_more_data(ForeignScanState *node);
452 static void close_cursor(PGconn *conn, unsigned int cursor_number,
453  PgFdwConnState *conn_state);
455  RangeTblEntry *rte,
456  ResultRelInfo *resultRelInfo,
457  CmdType operation,
458  Plan *subplan,
459  char *query,
460  List *target_attrs,
461  int values_end,
462  bool has_returning,
463  List *retrieved_attrs);
465  ResultRelInfo *resultRelInfo,
466  CmdType operation,
467  TupleTableSlot **slots,
468  TupleTableSlot **planSlots,
469  int *numSlots);
470 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
471 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
472  ItemPointer tupleid,
473  TupleTableSlot **slots,
474  int numSlots);
475 static void store_returning_result(PgFdwModifyState *fmstate,
476  TupleTableSlot *slot, PGresult *res);
477 static void finish_foreign_modify(PgFdwModifyState *fmstate);
478 static void deallocate_query(PgFdwModifyState *fmstate);
479 static List *build_remote_returning(Index rtindex, Relation rel,
480  List *returningList);
481 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
482 static void execute_dml_stmt(ForeignScanState *node);
484 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
485  List *fdw_scan_tlist,
486  Index rtindex);
488  ResultRelInfo *resultRelInfo,
489  TupleTableSlot *slot,
490  EState *estate);
491 static void prepare_query_params(PlanState *node,
492  List *fdw_exprs,
493  int numParams,
494  FmgrInfo **param_flinfo,
495  List **param_exprs,
496  const char ***param_values);
497 static void process_query_params(ExprContext *econtext,
498  FmgrInfo *param_flinfo,
499  List *param_exprs,
500  const char **param_values);
501 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
502  HeapTuple *rows, int targrows,
503  double *totalrows,
504  double *totaldeadrows);
505 static void analyze_row_processor(PGresult *res, int row,
506  PgFdwAnalyzeState *astate);
507 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
508 static void fetch_more_data_begin(AsyncRequest *areq);
509 static void complete_pending_request(AsyncRequest *areq);
511  int row,
512  Relation rel,
513  AttInMetadata *attinmeta,
514  List *retrieved_attrs,
515  ForeignScanState *fsstate,
516  MemoryContext temp_context);
517 static void conversion_error_callback(void *arg);
518 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
519  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
520  JoinPathExtraData *extra);
521 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
522  Node *havingQual);
524  RelOptInfo *rel);
527  Path *epq_path, List *restrictlist);
528 static void add_foreign_grouping_paths(PlannerInfo *root,
529  RelOptInfo *input_rel,
530  RelOptInfo *grouped_rel,
531  GroupPathExtraData *extra);
532 static void add_foreign_ordered_paths(PlannerInfo *root,
533  RelOptInfo *input_rel,
534  RelOptInfo *ordered_rel);
535 static void add_foreign_final_paths(PlannerInfo *root,
536  RelOptInfo *input_rel,
537  RelOptInfo *final_rel,
538  FinalPathExtraData *extra);
539 static void apply_server_options(PgFdwRelationInfo *fpinfo);
540 static void apply_table_options(PgFdwRelationInfo *fpinfo);
541 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
542  const PgFdwRelationInfo *fpinfo_o,
543  const PgFdwRelationInfo *fpinfo_i);
544 static int get_batch_size_option(Relation rel);
545 
546 
547 /*
548  * Foreign-data wrapper handler function: return a struct with pointers
549  * to my callback routines.
550  */
551 Datum
553 {
554  FdwRoutine *routine = makeNode(FdwRoutine);
555 
556  /* Functions for scanning foreign tables */
564 
565  /* Functions for updating foreign tables */
582 
583  /* Function for EvalPlanQual rechecks */
585  /* Support functions for EXPLAIN */
589 
590  /* Support function for TRUNCATE */
592 
593  /* Support functions for ANALYZE */
595 
596  /* Support functions for IMPORT FOREIGN SCHEMA */
598 
599  /* Support functions for join push-down */
601 
602  /* Support functions for upper relation push-down */
604 
605  /* Support functions for asynchronous execution */
610 
611  PG_RETURN_POINTER(routine);
612 }
613 
614 /*
615  * postgresGetForeignRelSize
616  * Estimate # of rows and width of the result of the scan
617  *
618  * We should consider the effect of all baserestrictinfo clauses here, but
619  * not any join clauses.
620  */
621 static void
623  RelOptInfo *baserel,
624  Oid foreigntableid)
625 {
626  PgFdwRelationInfo *fpinfo;
627  ListCell *lc;
628 
629  /*
630  * We use PgFdwRelationInfo to pass various information to subsequent
631  * functions.
632  */
633  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
634  baserel->fdw_private = (void *) fpinfo;
635 
636  /* Base foreign tables need to be pushed down always. */
637  fpinfo->pushdown_safe = true;
638 
639  /* Look up foreign-table catalog info. */
640  fpinfo->table = GetForeignTable(foreigntableid);
641  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
642 
643  /*
644  * Extract user-settable option values. Note that per-table settings of
645  * use_remote_estimate, fetch_size and async_capable override per-server
646  * settings of them, respectively.
647  */
648  fpinfo->use_remote_estimate = false;
651  fpinfo->shippable_extensions = NIL;
652  fpinfo->fetch_size = 100;
653  fpinfo->async_capable = false;
654 
655  apply_server_options(fpinfo);
656  apply_table_options(fpinfo);
657 
658  /*
659  * If the table or the server is configured to use remote estimates,
660  * identify which user to do remote access as during planning. This
661  * should match what ExecCheckPermissions() does. If we fail due to lack
662  * of permissions, the query would have failed at runtime anyway.
663  */
664  if (fpinfo->use_remote_estimate)
665  {
666  Oid userid;
667 
668  userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
669  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
670  }
671  else
672  fpinfo->user = NULL;
673 
674  /*
675  * Identify which baserestrictinfo clauses can be sent to the remote
676  * server and which can't.
677  */
678  classifyConditions(root, baserel, baserel->baserestrictinfo,
679  &fpinfo->remote_conds, &fpinfo->local_conds);
680 
681  /*
682  * Identify which attributes will need to be retrieved from the remote
683  * server. These include all attrs needed for joins or final output, plus
684  * all attrs used in the local_conds. (Note: if we end up using a
685  * parameterized scan, it's possible that some of the join clauses will be
686  * sent to the remote and thus we wouldn't really need to retrieve the
687  * columns used in them. Doesn't seem worth detecting that case though.)
688  */
689  fpinfo->attrs_used = NULL;
690  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
691  &fpinfo->attrs_used);
692  foreach(lc, fpinfo->local_conds)
693  {
694  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
695 
696  pull_varattnos((Node *) rinfo->clause, baserel->relid,
697  &fpinfo->attrs_used);
698  }
699 
700  /*
701  * Compute the selectivity and cost of the local_conds, so we don't have
702  * to do it over again for each path. The best we can do for these
703  * conditions is to estimate selectivity on the basis of local statistics.
704  */
706  fpinfo->local_conds,
707  baserel->relid,
708  JOIN_INNER,
709  NULL);
710 
711  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
712 
713  /*
714  * Set # of retrieved rows and cached relation costs to some negative
715  * value, so that we can detect when they are set to some sensible values,
716  * during one (usually the first) of the calls to estimate_path_cost_size.
717  */
718  fpinfo->retrieved_rows = -1;
719  fpinfo->rel_startup_cost = -1;
720  fpinfo->rel_total_cost = -1;
721 
722  /*
723  * If the table or the server is configured to use remote estimates,
724  * connect to the foreign server and execute EXPLAIN to estimate the
725  * number of rows selected by the restriction clauses, as well as the
726  * average row width. Otherwise, estimate using whatever statistics we
727  * have locally, in a way similar to ordinary tables.
728  */
729  if (fpinfo->use_remote_estimate)
730  {
731  /*
732  * Get cost/size estimates with help of remote server. Save the
733  * values in fpinfo so we don't need to do it again to generate the
734  * basic foreign path.
735  */
736  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
737  &fpinfo->rows, &fpinfo->width,
738  &fpinfo->startup_cost, &fpinfo->total_cost);
739 
740  /* Report estimated baserel size to planner. */
741  baserel->rows = fpinfo->rows;
742  baserel->reltarget->width = fpinfo->width;
743  }
744  else
745  {
746  /*
747  * If the foreign table has never been ANALYZEd, it will have
748  * reltuples < 0, meaning "unknown". We can't do much if we're not
749  * allowed to consult the remote server, but we can use a hack similar
750  * to plancat.c's treatment of empty relations: use a minimum size
751  * estimate of 10 pages, and divide by the column-datatype-based width
752  * estimate to get the corresponding number of tuples.
753  */
754  if (baserel->tuples < 0)
755  {
756  baserel->pages = 10;
757  baserel->tuples =
758  (10 * BLCKSZ) / (baserel->reltarget->width +
760  }
761 
762  /* Estimate baserel size as best we can with local statistics. */
763  set_baserel_size_estimates(root, baserel);
764 
765  /* Fill in basically-bogus cost estimates for use later. */
766  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
767  &fpinfo->rows, &fpinfo->width,
768  &fpinfo->startup_cost, &fpinfo->total_cost);
769  }
770 
771  /*
772  * fpinfo->relation_name gets the numeric rangetable index of the foreign
773  * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
774  * human-readable string at that time.)
775  */
776  fpinfo->relation_name = psprintf("%u", baserel->relid);
777 
778  /* No outer and inner relations. */
779  fpinfo->make_outerrel_subquery = false;
780  fpinfo->make_innerrel_subquery = false;
781  fpinfo->lower_subquery_rels = NULL;
782  /* Set the relation index. */
783  fpinfo->relation_index = baserel->relid;
784 }
785 
786 /*
787  * get_useful_ecs_for_relation
788  * Determine which EquivalenceClasses might be involved in useful
789  * orderings of this relation.
790  *
791  * This function is in some respects a mirror image of the core function
792  * pathkeys_useful_for_merging: for a regular table, we know what indexes
793  * we have and want to test whether any of them are useful. For a foreign
794  * table, we don't know what indexes are present on the remote side but
795  * want to speculate about which ones we'd like to use if they existed.
796  *
797  * This function returns a list of potentially-useful equivalence classes,
798  * but it does not guarantee that an EquivalenceMember exists which contains
799  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
800  * ft1.x + t1.x = 0, this function will say that the equivalence class
801  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
802  * t1 is local (or on a different server), it will turn out that no useful
803  * ORDER BY clause can be generated. It's not our job to figure that out
804  * here; we're only interested in identifying relevant ECs.
805  */
806 static List *
808 {
809  List *useful_eclass_list = NIL;
810  ListCell *lc;
811  Relids relids;
812 
813  /*
814  * First, consider whether any active EC is potentially useful for a merge
815  * join against this relation.
816  */
817  if (rel->has_eclass_joins)
818  {
819  foreach(lc, root->eq_classes)
820  {
821  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
822 
823  if (eclass_useful_for_merging(root, cur_ec, rel))
824  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
825  }
826  }
827 
828  /*
829  * Next, consider whether there are any non-EC derivable join clauses that
830  * are merge-joinable. If the joininfo list is empty, we can exit
831  * quickly.
832  */
833  if (rel->joininfo == NIL)
834  return useful_eclass_list;
835 
836  /* If this is a child rel, we must use the topmost parent rel to search. */
837  if (IS_OTHER_REL(rel))
838  {
840  relids = rel->top_parent_relids;
841  }
842  else
843  relids = rel->relids;
844 
845  /* Check each join clause in turn. */
846  foreach(lc, rel->joininfo)
847  {
848  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
849 
850  /* Consider only mergejoinable clauses */
851  if (restrictinfo->mergeopfamilies == NIL)
852  continue;
853 
854  /* Make sure we've got canonical ECs. */
855  update_mergeclause_eclasses(root, restrictinfo);
856 
857  /*
858  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
859  * that left_ec and right_ec will be initialized, per comments in
860  * distribute_qual_to_rels.
861  *
862  * We want to identify which side of this merge-joinable clause
863  * contains columns from the relation produced by this RelOptInfo. We
864  * test for overlap, not containment, because there could be extra
865  * relations on either side. For example, suppose we've got something
866  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
867  * A.y = D.y. The input rel might be the joinrel between A and B, and
868  * we'll consider the join clause A.y = D.y. relids contains a
869  * relation not involved in the join class (B) and the equivalence
870  * class for the left-hand side of the clause contains a relation not
871  * involved in the input rel (C). Despite the fact that we have only
872  * overlap and not containment in either direction, A.y is potentially
873  * useful as a sort column.
874  *
875  * Note that it's even possible that relids overlaps neither side of
876  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
877  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
878  * but overlaps neither side of B. In that case, we just skip this
879  * join clause, since it doesn't suggest a useful sort order for this
880  * relation.
881  */
882  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
883  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
884  restrictinfo->right_ec);
885  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
886  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
887  restrictinfo->left_ec);
888  }
889 
890  return useful_eclass_list;
891 }
892 
893 /*
894  * get_useful_pathkeys_for_relation
895  * Determine which orderings of a relation might be useful.
896  *
897  * Getting data in sorted order can be useful either because the requested
898  * order matches the final output ordering for the overall query we're
899  * planning, or because it enables an efficient merge join. Here, we try
900  * to figure out which pathkeys to consider.
901  */
902 static List *
904 {
905  List *useful_pathkeys_list = NIL;
906  List *useful_eclass_list;
907  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
908  EquivalenceClass *query_ec = NULL;
909  ListCell *lc;
910 
911  /*
912  * Pushing the query_pathkeys to the remote server is always worth
913  * considering, because it might let us avoid a local sort.
914  */
915  fpinfo->qp_is_pushdown_safe = false;
916  if (root->query_pathkeys)
917  {
918  bool query_pathkeys_ok = true;
919 
920  foreach(lc, root->query_pathkeys)
921  {
922  PathKey *pathkey = (PathKey *) lfirst(lc);
923 
924  /*
925  * The planner and executor don't have any clever strategy for
926  * taking data sorted by a prefix of the query's pathkeys and
927  * getting it to be sorted by all of those pathkeys. We'll just
928  * end up resorting the entire data set. So, unless we can push
929  * down all of the query pathkeys, forget it.
930  */
931  if (!is_foreign_pathkey(root, rel, pathkey))
932  {
933  query_pathkeys_ok = false;
934  break;
935  }
936  }
937 
938  if (query_pathkeys_ok)
939  {
940  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
941  fpinfo->qp_is_pushdown_safe = true;
942  }
943  }
944 
945  /*
946  * Even if we're not using remote estimates, having the remote side do the
947  * sort generally won't be any worse than doing it locally, and it might
948  * be much better if the remote side can generate data in the right order
949  * without needing a sort at all. However, what we're going to do next is
950  * try to generate pathkeys that seem promising for possible merge joins,
951  * and that's more speculative. A wrong choice might hurt quite a bit, so
952  * bail out if we can't use remote estimates.
953  */
954  if (!fpinfo->use_remote_estimate)
955  return useful_pathkeys_list;
956 
957  /* Get the list of interesting EquivalenceClasses. */
958  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
959 
960  /* Extract unique EC for query, if any, so we don't consider it again. */
961  if (list_length(root->query_pathkeys) == 1)
962  {
963  PathKey *query_pathkey = linitial(root->query_pathkeys);
964 
965  query_ec = query_pathkey->pk_eclass;
966  }
967 
968  /*
969  * As a heuristic, the only pathkeys we consider here are those of length
970  * one. It's surely possible to consider more, but since each one we
971  * choose to consider will generate a round-trip to the remote side, we
972  * need to be a bit cautious here. It would sure be nice to have a local
973  * cache of information about remote index definitions...
974  */
975  foreach(lc, useful_eclass_list)
976  {
977  EquivalenceClass *cur_ec = lfirst(lc);
978  PathKey *pathkey;
979 
980  /* If redundant with what we did above, skip it. */
981  if (cur_ec == query_ec)
982  continue;
983 
984  /* Can't push down the sort if the EC's opfamily is not shippable. */
986  OperatorFamilyRelationId, fpinfo))
987  continue;
988 
989  /* If no pushable expression for this rel, skip it. */
990  if (find_em_for_rel(root, cur_ec, rel) == NULL)
991  continue;
992 
993  /* Looks like we can generate a pathkey, so let's do it. */
994  pathkey = make_canonical_pathkey(root, cur_ec,
995  linitial_oid(cur_ec->ec_opfamilies),
997  false);
998  useful_pathkeys_list = lappend(useful_pathkeys_list,
999  list_make1(pathkey));
1000  }
1001 
1002  return useful_pathkeys_list;
1003 }
1004 
1005 /*
1006  * postgresGetForeignPaths
1007  * Create possible scan paths for a scan on the foreign table
1008  */
1009 static void
1011  RelOptInfo *baserel,
1012  Oid foreigntableid)
1013 {
1014  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1015  ForeignPath *path;
1016  List *ppi_list;
1017  ListCell *lc;
1018 
1019  /*
1020  * Create simplest ForeignScan path node and add it to baserel. This path
1021  * corresponds to SeqScan path of regular tables (though depending on what
1022  * baserestrict conditions we were able to send to remote, there might
1023  * actually be an indexscan happening there). We already did all the work
1024  * to estimate cost and size of this path.
1025  *
1026  * Although this path uses no join clauses, it could still have required
1027  * parameterization due to LATERAL refs in its tlist.
1028  */
1029  path = create_foreignscan_path(root, baserel,
1030  NULL, /* default pathtarget */
1031  fpinfo->rows,
1032  fpinfo->startup_cost,
1033  fpinfo->total_cost,
1034  NIL, /* no pathkeys */
1035  baserel->lateral_relids,
1036  NULL, /* no extra plan */
1037  NIL, /* no fdw_restrictinfo list */
1038  NIL); /* no fdw_private list */
1039  add_path(baserel, (Path *) path);
1040 
1041  /* Add paths with pathkeys */
1042  add_paths_with_pathkeys_for_rel(root, baserel, NULL, NIL);
1043 
1044  /*
1045  * If we're not using remote estimates, stop here. We have no way to
1046  * estimate whether any join clauses would be worth sending across, so
1047  * don't bother building parameterized paths.
1048  */
1049  if (!fpinfo->use_remote_estimate)
1050  return;
1051 
1052  /*
1053  * Thumb through all join clauses for the rel to identify which outer
1054  * relations could supply one or more safe-to-send-to-remote join clauses.
1055  * We'll build a parameterized path for each such outer relation.
1056  *
1057  * It's convenient to manage this by representing each candidate outer
1058  * relation by the ParamPathInfo node for it. We can then use the
1059  * ppi_clauses list in the ParamPathInfo node directly as a list of the
1060  * interesting join clauses for that rel. This takes care of the
1061  * possibility that there are multiple safe join clauses for such a rel,
1062  * and also ensures that we account for unsafe join clauses that we'll
1063  * still have to enforce locally (since the parameterized-path machinery
1064  * insists that we handle all movable clauses).
1065  */
1066  ppi_list = NIL;
1067  foreach(lc, baserel->joininfo)
1068  {
1069  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1070  Relids required_outer;
1071  ParamPathInfo *param_info;
1072 
1073  /* Check if clause can be moved to this rel */
1074  if (!join_clause_is_movable_to(rinfo, baserel))
1075  continue;
1076 
1077  /* See if it is safe to send to remote */
1078  if (!is_foreign_expr(root, baserel, rinfo->clause))
1079  continue;
1080 
1081  /* Calculate required outer rels for the resulting path */
1082  required_outer = bms_union(rinfo->clause_relids,
1083  baserel->lateral_relids);
1084  /* We do not want the foreign rel itself listed in required_outer */
1085  required_outer = bms_del_member(required_outer, baserel->relid);
1086 
1087  /*
1088  * required_outer probably can't be empty here, but if it were, we
1089  * couldn't make a parameterized path.
1090  */
1091  if (bms_is_empty(required_outer))
1092  continue;
1093 
1094  /* Get the ParamPathInfo */
1095  param_info = get_baserel_parampathinfo(root, baserel,
1096  required_outer);
1097  Assert(param_info != NULL);
1098 
1099  /*
1100  * Add it to list unless we already have it. Testing pointer equality
1101  * is OK since get_baserel_parampathinfo won't make duplicates.
1102  */
1103  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1104  }
1105 
1106  /*
1107  * The above scan examined only "generic" join clauses, not those that
1108  * were absorbed into EquivalenceClauses. See if we can make anything out
1109  * of EquivalenceClauses.
1110  */
1111  if (baserel->has_eclass_joins)
1112  {
1113  /*
1114  * We repeatedly scan the eclass list looking for column references
1115  * (or expressions) belonging to the foreign rel. Each time we find
1116  * one, we generate a list of equivalence joinclauses for it, and then
1117  * see if any are safe to send to the remote. Repeat till there are
1118  * no more candidate EC members.
1119  */
1121 
1122  arg.already_used = NIL;
1123  for (;;)
1124  {
1125  List *clauses;
1126 
1127  /* Make clauses, skipping any that join to lateral_referencers */
1128  arg.current = NULL;
1130  baserel,
1132  (void *) &arg,
1133  baserel->lateral_referencers);
1134 
1135  /* Done if there are no more expressions in the foreign rel */
1136  if (arg.current == NULL)
1137  {
1138  Assert(clauses == NIL);
1139  break;
1140  }
1141 
1142  /* Scan the extracted join clauses */
1143  foreach(lc, clauses)
1144  {
1145  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1146  Relids required_outer;
1147  ParamPathInfo *param_info;
1148 
1149  /* Check if clause can be moved to this rel */
1150  if (!join_clause_is_movable_to(rinfo, baserel))
1151  continue;
1152 
1153  /* See if it is safe to send to remote */
1154  if (!is_foreign_expr(root, baserel, rinfo->clause))
1155  continue;
1156 
1157  /* Calculate required outer rels for the resulting path */
1158  required_outer = bms_union(rinfo->clause_relids,
1159  baserel->lateral_relids);
1160  required_outer = bms_del_member(required_outer, baserel->relid);
1161  if (bms_is_empty(required_outer))
1162  continue;
1163 
1164  /* Get the ParamPathInfo */
1165  param_info = get_baserel_parampathinfo(root, baserel,
1166  required_outer);
1167  Assert(param_info != NULL);
1168 
1169  /* Add it to list unless we already have it */
1170  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1171  }
1172 
1173  /* Try again, now ignoring the expression we found this time */
1174  arg.already_used = lappend(arg.already_used, arg.current);
1175  }
1176  }
1177 
1178  /*
1179  * Now build a path for each useful outer relation.
1180  */
1181  foreach(lc, ppi_list)
1182  {
1183  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1184  double rows;
1185  int width;
1186  Cost startup_cost;
1187  Cost total_cost;
1188 
1189  /* Get a cost estimate from the remote */
1190  estimate_path_cost_size(root, baserel,
1191  param_info->ppi_clauses, NIL, NULL,
1192  &rows, &width,
1193  &startup_cost, &total_cost);
1194 
1195  /*
1196  * ppi_rows currently won't get looked at by anything, but still we
1197  * may as well ensure that it matches our idea of the rowcount.
1198  */
1199  param_info->ppi_rows = rows;
1200 
1201  /* Make the path */
1202  path = create_foreignscan_path(root, baserel,
1203  NULL, /* default pathtarget */
1204  rows,
1205  startup_cost,
1206  total_cost,
1207  NIL, /* no pathkeys */
1208  param_info->ppi_req_outer,
1209  NULL,
1210  NIL, /* no fdw_restrictinfo list */
1211  NIL); /* no fdw_private list */
1212  add_path(baserel, (Path *) path);
1213  }
1214 }
1215 
1216 /*
1217  * postgresGetForeignPlan
1218  * Create ForeignScan plan node which implements selected best path
1219  */
1220 static ForeignScan *
1222  RelOptInfo *foreignrel,
1223  Oid foreigntableid,
1224  ForeignPath *best_path,
1225  List *tlist,
1226  List *scan_clauses,
1227  Plan *outer_plan)
1228 {
1229  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1230  Index scan_relid;
1231  List *fdw_private;
1232  List *remote_exprs = NIL;
1233  List *local_exprs = NIL;
1234  List *params_list = NIL;
1235  List *fdw_scan_tlist = NIL;
1236  List *fdw_recheck_quals = NIL;
1237  List *retrieved_attrs;
1238  StringInfoData sql;
1239  bool has_final_sort = false;
1240  bool has_limit = false;
1241  ListCell *lc;
1242 
1243  /*
1244  * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1245  */
1246  if (best_path->fdw_private)
1247  {
1248  has_final_sort = boolVal(list_nth(best_path->fdw_private,
1250  has_limit = boolVal(list_nth(best_path->fdw_private,
1252  }
1253 
1254  if (IS_SIMPLE_REL(foreignrel))
1255  {
1256  /*
1257  * For base relations, set scan_relid as the relid of the relation.
1258  */
1259  scan_relid = foreignrel->relid;
1260 
1261  /*
1262  * In a base-relation scan, we must apply the given scan_clauses.
1263  *
1264  * Separate the scan_clauses into those that can be executed remotely
1265  * and those that can't. baserestrictinfo clauses that were
1266  * previously determined to be safe or unsafe by classifyConditions
1267  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1268  * else in the scan_clauses list will be a join clause, which we have
1269  * to check for remote-safety.
1270  *
1271  * Note: the join clauses we see here should be the exact same ones
1272  * previously examined by postgresGetForeignPaths. Possibly it'd be
1273  * worth passing forward the classification work done then, rather
1274  * than repeating it here.
1275  *
1276  * This code must match "extract_actual_clauses(scan_clauses, false)"
1277  * except for the additional decision about remote versus local
1278  * execution.
1279  */
1280  foreach(lc, scan_clauses)
1281  {
1282  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1283 
1284  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1285  if (rinfo->pseudoconstant)
1286  continue;
1287 
1288  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1289  remote_exprs = lappend(remote_exprs, rinfo->clause);
1290  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1291  local_exprs = lappend(local_exprs, rinfo->clause);
1292  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1293  remote_exprs = lappend(remote_exprs, rinfo->clause);
1294  else
1295  local_exprs = lappend(local_exprs, rinfo->clause);
1296  }
1297 
1298  /*
1299  * For a base-relation scan, we have to support EPQ recheck, which
1300  * should recheck all the remote quals.
1301  */
1302  fdw_recheck_quals = remote_exprs;
1303  }
1304  else
1305  {
1306  /*
1307  * Join relation or upper relation - set scan_relid to 0.
1308  */
1309  scan_relid = 0;
1310 
1311  /*
1312  * For a join rel, baserestrictinfo is NIL and we are not considering
1313  * parameterization right now, so there should be no scan_clauses for
1314  * a joinrel or an upper rel either.
1315  */
1316  Assert(!scan_clauses);
1317 
1318  /*
1319  * Instead we get the conditions to apply from the fdw_private
1320  * structure.
1321  */
1322  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1323  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1324 
1325  /*
1326  * We leave fdw_recheck_quals empty in this case, since we never need
1327  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1328  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1329  * If we're planning an upperrel (ie, remote grouping or aggregation)
1330  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1331  * allowed, and indeed we *can't* put the remote clauses into
1332  * fdw_recheck_quals because the unaggregated Vars won't be available
1333  * locally.
1334  */
1335 
1336  /* Build the list of columns to be fetched from the foreign server. */
1337  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1338 
1339  /*
1340  * Ensure that the outer plan produces a tuple whose descriptor
1341  * matches our scan tuple slot. Also, remove the local conditions
1342  * from outer plan's quals, lest they be evaluated twice, once by the
1343  * local plan and once by the scan.
1344  */
1345  if (outer_plan)
1346  {
1347  /*
1348  * Right now, we only consider grouping and aggregation beyond
1349  * joins. Queries involving aggregates or grouping do not require
1350  * EPQ mechanism, hence should not have an outer plan here.
1351  */
1352  Assert(!IS_UPPER_REL(foreignrel));
1353 
1354  /*
1355  * First, update the plan's qual list if possible. In some cases
1356  * the quals might be enforced below the topmost plan level, in
1357  * which case we'll fail to remove them; it's not worth working
1358  * harder than this.
1359  */
1360  foreach(lc, local_exprs)
1361  {
1362  Node *qual = lfirst(lc);
1363 
1364  outer_plan->qual = list_delete(outer_plan->qual, qual);
1365 
1366  /*
1367  * For an inner join the local conditions of foreign scan plan
1368  * can be part of the joinquals as well. (They might also be
1369  * in the mergequals or hashquals, but we can't touch those
1370  * without breaking the plan.)
1371  */
1372  if (IsA(outer_plan, NestLoop) ||
1373  IsA(outer_plan, MergeJoin) ||
1374  IsA(outer_plan, HashJoin))
1375  {
1376  Join *join_plan = (Join *) outer_plan;
1377 
1378  if (join_plan->jointype == JOIN_INNER)
1379  join_plan->joinqual = list_delete(join_plan->joinqual,
1380  qual);
1381  }
1382  }
1383 
1384  /*
1385  * Now fix the subplan's tlist --- this might result in inserting
1386  * a Result node atop the plan tree.
1387  */
1388  outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1389  best_path->path.parallel_safe);
1390  }
1391  }
1392 
1393  /*
1394  * Build the query string to be sent for execution, and identify
1395  * expressions to be sent as parameters.
1396  */
1397  initStringInfo(&sql);
1398  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1399  remote_exprs, best_path->path.pathkeys,
1400  has_final_sort, has_limit, false,
1401  &retrieved_attrs, &params_list);
1402 
1403  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1404  fpinfo->final_remote_exprs = remote_exprs;
1405 
1406  /*
1407  * Build the fdw_private list that will be available to the executor.
1408  * Items in the list must match order in enum FdwScanPrivateIndex.
1409  */
1410  fdw_private = list_make3(makeString(sql.data),
1411  retrieved_attrs,
1412  makeInteger(fpinfo->fetch_size));
1413  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1414  fdw_private = lappend(fdw_private,
1415  makeString(fpinfo->relation_name));
1416 
1417  /*
1418  * Create the ForeignScan node for the given relation.
1419  *
1420  * Note that the remote parameter expressions are stored in the fdw_exprs
1421  * field of the finished plan node; we can't keep them in private state
1422  * because then they wouldn't be subject to later planner processing.
1423  */
1424  return make_foreignscan(tlist,
1425  local_exprs,
1426  scan_relid,
1427  params_list,
1428  fdw_private,
1429  fdw_scan_tlist,
1430  fdw_recheck_quals,
1431  outer_plan);
1432 }
1433 
1434 /*
1435  * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1436  */
1437 static TupleDesc
1439 {
1440  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1441  EState *estate = node->ss.ps.state;
1442  TupleDesc tupdesc;
1443 
1444  /*
1445  * The core code has already set up a scan tuple slot based on
1446  * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1447  * but there's one case where it isn't. If we have any whole-row row
1448  * identifier Vars, they may have vartype RECORD, and we need to replace
1449  * that with the associated table's actual composite type. This ensures
1450  * that when we read those ROW() expression values from the remote server,
1451  * we can convert them to a composite type the local server knows.
1452  */
1454  for (int i = 0; i < tupdesc->natts; i++)
1455  {
1456  Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1457  Var *var;
1458  RangeTblEntry *rte;
1459  Oid reltype;
1460 
1461  /* Nothing to do if it's not a generic RECORD attribute */
1462  if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1463  continue;
1464 
1465  /*
1466  * If we can't identify the referenced table, do nothing. This'll
1467  * likely lead to failure later, but perhaps we can muddle through.
1468  */
1469  var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1470  i)->expr;
1471  if (!IsA(var, Var) || var->varattno != 0)
1472  continue;
1473  rte = list_nth(estate->es_range_table, var->varno - 1);
1474  if (rte->rtekind != RTE_RELATION)
1475  continue;
1476  reltype = get_rel_type_id(rte->relid);
1477  if (!OidIsValid(reltype))
1478  continue;
1479  att->atttypid = reltype;
1480  /* shouldn't need to change anything else */
1481  }
1482  return tupdesc;
1483 }
1484 
1485 /*
1486  * postgresBeginForeignScan
1487  * Initiate an executor scan of a foreign PostgreSQL table.
1488  */
1489 static void
1491 {
1492  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1493  EState *estate = node->ss.ps.state;
1494  PgFdwScanState *fsstate;
1495  RangeTblEntry *rte;
1496  Oid userid;
1497  ForeignTable *table;
1498  UserMapping *user;
1499  int rtindex;
1500  int numParams;
1501 
1502  /*
1503  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1504  */
1505  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1506  return;
1507 
1508  /*
1509  * We'll save private state in node->fdw_state.
1510  */
1511  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1512  node->fdw_state = (void *) fsstate;
1513 
1514  /*
1515  * Identify which user to do the remote access as. This should match what
1516  * ExecCheckPermissions() does.
1517  */
1518  userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1519  if (fsplan->scan.scanrelid > 0)
1520  rtindex = fsplan->scan.scanrelid;
1521  else
1522  rtindex = bms_next_member(fsplan->fs_base_relids, -1);
1523  rte = exec_rt_fetch(rtindex, estate);
1524 
1525  /* Get info about foreign table. */
1526  table = GetForeignTable(rte->relid);
1527  user = GetUserMapping(userid, table->serverid);
1528 
1529  /*
1530  * Get connection to the foreign server. Connection manager will
1531  * establish new connection if necessary.
1532  */
1533  fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1534 
1535  /* Assign a unique ID for my cursor */
1536  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1537  fsstate->cursor_exists = false;
1538 
1539  /* Get private info created by planner functions. */
1540  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1542  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1544  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1546 
1547  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1548  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1549  "postgres_fdw tuple data",
1551  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1552  "postgres_fdw temporary data",
1554 
1555  /*
1556  * Get info we'll need for converting data fetched from the foreign server
1557  * into local representation and error reporting during that process.
1558  */
1559  if (fsplan->scan.scanrelid > 0)
1560  {
1561  fsstate->rel = node->ss.ss_currentRelation;
1562  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1563  }
1564  else
1565  {
1566  fsstate->rel = NULL;
1567  fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1568  }
1569 
1570  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1571 
1572  /*
1573  * Prepare for processing of parameters used in remote query, if any.
1574  */
1575  numParams = list_length(fsplan->fdw_exprs);
1576  fsstate->numParams = numParams;
1577  if (numParams > 0)
1579  fsplan->fdw_exprs,
1580  numParams,
1581  &fsstate->param_flinfo,
1582  &fsstate->param_exprs,
1583  &fsstate->param_values);
1584 
1585  /* Set the async-capable flag */
1586  fsstate->async_capable = node->ss.ps.async_capable;
1587 }
1588 
1589 /*
1590  * postgresIterateForeignScan
1591  * Retrieve next row from the result set, or clear tuple slot to indicate
1592  * EOF.
1593  */
1594 static TupleTableSlot *
1596 {
1597  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1598  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1599 
1600  /*
1601  * In sync mode, if this is the first call after Begin or ReScan, we need
1602  * to create the cursor on the remote side. In async mode, we would have
1603  * already created the cursor before we get here, even if this is the
1604  * first call after Begin or ReScan.
1605  */
1606  if (!fsstate->cursor_exists)
1607  create_cursor(node);
1608 
1609  /*
1610  * Get some more tuples, if we've run out.
1611  */
1612  if (fsstate->next_tuple >= fsstate->num_tuples)
1613  {
1614  /* In async mode, just clear tuple slot. */
1615  if (fsstate->async_capable)
1616  return ExecClearTuple(slot);
1617  /* No point in another fetch if we already detected EOF, though. */
1618  if (!fsstate->eof_reached)
1619  fetch_more_data(node);
1620  /* If we didn't get any tuples, must be end of data. */
1621  if (fsstate->next_tuple >= fsstate->num_tuples)
1622  return ExecClearTuple(slot);
1623  }
1624 
1625  /*
1626  * Return the next tuple.
1627  */
1628  ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1629  slot,
1630  false);
1631 
1632  return slot;
1633 }
1634 
1635 /*
1636  * postgresReScanForeignScan
1637  * Restart the scan.
1638  */
1639 static void
1641 {
1642  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1643  char sql[64];
1644  PGresult *res;
1645 
1646  /* If we haven't created the cursor yet, nothing to do. */
1647  if (!fsstate->cursor_exists)
1648  return;
1649 
1650  /*
1651  * If the node is async-capable, and an asynchronous fetch for it has
1652  * begun, the asynchronous fetch might not have yet completed. Check if
1653  * the node is async-capable, and an asynchronous fetch for it is still in
1654  * progress; if so, complete the asynchronous fetch before restarting the
1655  * scan.
1656  */
1657  if (fsstate->async_capable &&
1658  fsstate->conn_state->pendingAreq &&
1659  fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1660  fetch_more_data(node);
1661 
1662  /*
1663  * If any internal parameters affecting this node have changed, we'd
1664  * better destroy and recreate the cursor. Otherwise, rewinding it should
1665  * be good enough. If we've only fetched zero or one batch, we needn't
1666  * even rewind the cursor, just rescan what we have.
1667  */
1668  if (node->ss.ps.chgParam != NULL)
1669  {
1670  fsstate->cursor_exists = false;
1671  snprintf(sql, sizeof(sql), "CLOSE c%u",
1672  fsstate->cursor_number);
1673  }
1674  else if (fsstate->fetch_ct_2 > 1)
1675  {
1676  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1677  fsstate->cursor_number);
1678  }
1679  else
1680  {
1681  /* Easy: just rescan what we already have in memory, if anything */
1682  fsstate->next_tuple = 0;
1683  return;
1684  }
1685 
1686  /*
1687  * We don't use a PG_TRY block here, so be careful not to throw error
1688  * without releasing the PGresult.
1689  */
1690  res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1692  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1693  PQclear(res);
1694 
1695  /* Now force a fresh FETCH. */
1696  fsstate->tuples = NULL;
1697  fsstate->num_tuples = 0;
1698  fsstate->next_tuple = 0;
1699  fsstate->fetch_ct_2 = 0;
1700  fsstate->eof_reached = false;
1701 }
1702 
1703 /*
1704  * postgresEndForeignScan
1705  * Finish scanning foreign table and dispose objects used for this scan
1706  */
1707 static void
1709 {
1710  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1711 
1712  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1713  if (fsstate == NULL)
1714  return;
1715 
1716  /* Close the cursor if open, to prevent accumulation of cursors */
1717  if (fsstate->cursor_exists)
1718  close_cursor(fsstate->conn, fsstate->cursor_number,
1719  fsstate->conn_state);
1720 
1721  /* Release remote connection */
1722  ReleaseConnection(fsstate->conn);
1723  fsstate->conn = NULL;
1724 
1725  /* MemoryContexts will be deleted automatically. */
1726 }
1727 
1728 /*
1729  * postgresAddForeignUpdateTargets
1730  * Add resjunk column(s) needed for update/delete on a foreign table
1731  */
1732 static void
1734  Index rtindex,
1735  RangeTblEntry *target_rte,
1736  Relation target_relation)
1737 {
1738  Var *var;
1739 
1740  /*
1741  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1742  */
1743 
1744  /* Make a Var representing the desired value */
1745  var = makeVar(rtindex,
1747  TIDOID,
1748  -1,
1749  InvalidOid,
1750  0);
1751 
1752  /* Register it as a row-identity column needed by this target rel */
1753  add_row_identity_var(root, var, rtindex, "ctid");
1754 }
1755 
1756 /*
1757  * postgresPlanForeignModify
1758  * Plan an insert/update/delete operation on a foreign table
1759  */
1760 static List *
1762  ModifyTable *plan,
1763  Index resultRelation,
1764  int subplan_index)
1765 {
1766  CmdType operation = plan->operation;
1767  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1768  Relation rel;
1769  StringInfoData sql;
1770  List *targetAttrs = NIL;
1771  List *withCheckOptionList = NIL;
1772  List *returningList = NIL;
1773  List *retrieved_attrs = NIL;
1774  bool doNothing = false;
1775  int values_end_len = -1;
1776 
1777  initStringInfo(&sql);
1778 
1779  /*
1780  * Core code already has some lock on each rel being planned, so we can
1781  * use NoLock here.
1782  */
1783  rel = table_open(rte->relid, NoLock);
1784 
1785  /*
1786  * In an INSERT, we transmit all columns that are defined in the foreign
1787  * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1788  * foreign table, we transmit all columns like INSERT; else we transmit
1789  * only columns that were explicitly targets of the UPDATE, so as to avoid
1790  * unnecessary data transmission. (We can't do that for INSERT since we
1791  * would miss sending default values for columns not listed in the source
1792  * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1793  * those triggers might change values for non-target columns, in which
1794  * case we would miss sending changed values for those columns.)
1795  */
1796  if (operation == CMD_INSERT ||
1797  (operation == CMD_UPDATE &&
1798  rel->trigdesc &&
1800  {
1801  TupleDesc tupdesc = RelationGetDescr(rel);
1802  int attnum;
1803 
1804  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1805  {
1806  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1807 
1808  if (!attr->attisdropped)
1809  targetAttrs = lappend_int(targetAttrs, attnum);
1810  }
1811  }
1812  else if (operation == CMD_UPDATE)
1813  {
1814  int col;
1815  RelOptInfo *rel = find_base_rel(root, resultRelation);
1816  Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel);
1817 
1818  col = -1;
1819  while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1820  {
1821  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1823 
1824  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1825  elog(ERROR, "system-column update is not supported");
1826  targetAttrs = lappend_int(targetAttrs, attno);
1827  }
1828  }
1829 
1830  /*
1831  * Extract the relevant WITH CHECK OPTION list if any.
1832  */
1833  if (plan->withCheckOptionLists)
1834  withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1835  subplan_index);
1836 
1837  /*
1838  * Extract the relevant RETURNING list if any.
1839  */
1840  if (plan->returningLists)
1841  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1842 
1843  /*
1844  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1845  * should have already been rejected in the optimizer, as presently there
1846  * is no way to recognize an arbiter index on a foreign table. Only DO
1847  * NOTHING is supported without an inference specification.
1848  */
1849  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1850  doNothing = true;
1851  else if (plan->onConflictAction != ONCONFLICT_NONE)
1852  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1853  (int) plan->onConflictAction);
1854 
1855  /*
1856  * Construct the SQL command string.
1857  */
1858  switch (operation)
1859  {
1860  case CMD_INSERT:
1861  deparseInsertSql(&sql, rte, resultRelation, rel,
1862  targetAttrs, doNothing,
1863  withCheckOptionList, returningList,
1864  &retrieved_attrs, &values_end_len);
1865  break;
1866  case CMD_UPDATE:
1867  deparseUpdateSql(&sql, rte, resultRelation, rel,
1868  targetAttrs,
1869  withCheckOptionList, returningList,
1870  &retrieved_attrs);
1871  break;
1872  case CMD_DELETE:
1873  deparseDeleteSql(&sql, rte, resultRelation, rel,
1874  returningList,
1875  &retrieved_attrs);
1876  break;
1877  default:
1878  elog(ERROR, "unexpected operation: %d", (int) operation);
1879  break;
1880  }
1881 
1882  table_close(rel, NoLock);
1883 
1884  /*
1885  * Build the fdw_private list that will be available to the executor.
1886  * Items in the list must match enum FdwModifyPrivateIndex, above.
1887  */
1888  return list_make5(makeString(sql.data),
1889  targetAttrs,
1890  makeInteger(values_end_len),
1891  makeBoolean((retrieved_attrs != NIL)),
1892  retrieved_attrs);
1893 }
1894 
1895 /*
1896  * postgresBeginForeignModify
1897  * Begin an insert/update/delete operation on a foreign table
1898  */
1899 static void
1901  ResultRelInfo *resultRelInfo,
1902  List *fdw_private,
1903  int subplan_index,
1904  int eflags)
1905 {
1906  PgFdwModifyState *fmstate;
1907  char *query;
1908  List *target_attrs;
1909  bool has_returning;
1910  int values_end_len;
1911  List *retrieved_attrs;
1912  RangeTblEntry *rte;
1913 
1914  /*
1915  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1916  * stays NULL.
1917  */
1918  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1919  return;
1920 
1921  /* Deconstruct fdw_private data. */
1922  query = strVal(list_nth(fdw_private,
1924  target_attrs = (List *) list_nth(fdw_private,
1926  values_end_len = intVal(list_nth(fdw_private,
1928  has_returning = boolVal(list_nth(fdw_private,
1930  retrieved_attrs = (List *) list_nth(fdw_private,
1932 
1933  /* Find RTE. */
1934  rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1935  mtstate->ps.state);
1936 
1937  /* Construct an execution state. */
1938  fmstate = create_foreign_modify(mtstate->ps.state,
1939  rte,
1940  resultRelInfo,
1941  mtstate->operation,
1942  outerPlanState(mtstate)->plan,
1943  query,
1944  target_attrs,
1945  values_end_len,
1946  has_returning,
1947  retrieved_attrs);
1948 
1949  resultRelInfo->ri_FdwState = fmstate;
1950 }
1951 
1952 /*
1953  * postgresExecForeignInsert
1954  * Insert one row into a foreign table
1955  */
1956 static TupleTableSlot *
1958  ResultRelInfo *resultRelInfo,
1959  TupleTableSlot *slot,
1960  TupleTableSlot *planSlot)
1961 {
1962  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1963  TupleTableSlot **rslot;
1964  int numSlots = 1;
1965 
1966  /*
1967  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1968  * postgresBeginForeignInsert())
1969  */
1970  if (fmstate->aux_fmstate)
1971  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1972  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1973  &slot, &planSlot, &numSlots);
1974  /* Revert that change */
1975  if (fmstate->aux_fmstate)
1976  resultRelInfo->ri_FdwState = fmstate;
1977 
1978  return rslot ? *rslot : NULL;
1979 }
1980 
1981 /*
1982  * postgresExecForeignBatchInsert
1983  * Insert multiple rows into a foreign table
1984  */
1985 static TupleTableSlot **
1987  ResultRelInfo *resultRelInfo,
1988  TupleTableSlot **slots,
1989  TupleTableSlot **planSlots,
1990  int *numSlots)
1991 {
1992  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1993  TupleTableSlot **rslot;
1994 
1995  /*
1996  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1997  * postgresBeginForeignInsert())
1998  */
1999  if (fmstate->aux_fmstate)
2000  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
2001  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
2002  slots, planSlots, numSlots);
2003  /* Revert that change */
2004  if (fmstate->aux_fmstate)
2005  resultRelInfo->ri_FdwState = fmstate;
2006 
2007  return rslot;
2008 }
2009 
2010 /*
2011  * postgresGetForeignModifyBatchSize
2012  * Determine the maximum number of tuples that can be inserted in bulk
2013  *
2014  * Returns the batch size specified for server or table. When batching is not
2015  * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2016  * clause), returns 1.
2017  */
2018 static int
2020 {
2021  int batch_size;
2022  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2023 
2024  /* should be called only once */
2025  Assert(resultRelInfo->ri_BatchSize == 0);
2026 
2027  /*
2028  * Should never get called when the insert is being performed on a table
2029  * that is also among the target relations of an UPDATE operation, because
2030  * postgresBeginForeignInsert() currently rejects such insert attempts.
2031  */
2032  Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2033 
2034  /*
2035  * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2036  * the option directly in server/table options. Otherwise just use the
2037  * value we determined earlier.
2038  */
2039  if (fmstate)
2040  batch_size = fmstate->batch_size;
2041  else
2042  batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2043 
2044  /*
2045  * Disable batching when we have to use RETURNING, there are any
2046  * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2047  * WITH CHECK OPTION constraints from parent views.
2048  *
2049  * When there are any BEFORE ROW INSERT triggers on the table, we can't
2050  * support it, because such triggers might query the table we're inserting
2051  * into and act differently if the tuples that have already been processed
2052  * and prepared for insertion are not there.
2053  */
2054  if (resultRelInfo->ri_projectReturning != NULL ||
2055  resultRelInfo->ri_WithCheckOptions != NIL ||
2056  (resultRelInfo->ri_TrigDesc &&
2057  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2058  resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2059  return 1;
2060 
2061  /*
2062  * If the foreign table has no columns, disable batching as the INSERT
2063  * syntax doesn't allow batching multiple empty rows into a zero-column
2064  * table in a single statement. This is needed for COPY FROM, in which
2065  * case fmstate must be non-NULL.
2066  */
2067  if (fmstate && list_length(fmstate->target_attrs) == 0)
2068  return 1;
2069 
2070  /*
2071  * Otherwise use the batch size specified for server/table. The number of
2072  * parameters in a batch is limited to 65535 (uint16), so make sure we
2073  * don't exceed this limit by using the maximum batch_size possible.
2074  */
2075  if (fmstate && fmstate->p_nums > 0)
2076  batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2077 
2078  return batch_size;
2079 }
2080 
2081 /*
2082  * postgresExecForeignUpdate
2083  * Update one row in a foreign table
2084  */
2085 static TupleTableSlot *
2087  ResultRelInfo *resultRelInfo,
2088  TupleTableSlot *slot,
2089  TupleTableSlot *planSlot)
2090 {
2091  TupleTableSlot **rslot;
2092  int numSlots = 1;
2093 
2094  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2095  &slot, &planSlot, &numSlots);
2096 
2097  return rslot ? rslot[0] : NULL;
2098 }
2099 
2100 /*
2101  * postgresExecForeignDelete
2102  * Delete one row from a foreign table
2103  */
2104 static TupleTableSlot *
2106  ResultRelInfo *resultRelInfo,
2107  TupleTableSlot *slot,
2108  TupleTableSlot *planSlot)
2109 {
2110  TupleTableSlot **rslot;
2111  int numSlots = 1;
2112 
2113  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2114  &slot, &planSlot, &numSlots);
2115 
2116  return rslot ? rslot[0] : NULL;
2117 }
2118 
2119 /*
2120  * postgresEndForeignModify
2121  * Finish an insert/update/delete operation on a foreign table
2122  */
2123 static void
2125  ResultRelInfo *resultRelInfo)
2126 {
2127  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2128 
2129  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2130  if (fmstate == NULL)
2131  return;
2132 
2133  /* Destroy the execution state */
2134  finish_foreign_modify(fmstate);
2135 }
2136 
2137 /*
2138  * postgresBeginForeignInsert
2139  * Begin an insert operation on a foreign table
2140  */
2141 static void
2143  ResultRelInfo *resultRelInfo)
2144 {
2145  PgFdwModifyState *fmstate;
2146  ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2147  EState *estate = mtstate->ps.state;
2148  Index resultRelation;
2149  Relation rel = resultRelInfo->ri_RelationDesc;
2150  RangeTblEntry *rte;
2151  TupleDesc tupdesc = RelationGetDescr(rel);
2152  int attnum;
2153  int values_end_len;
2154  StringInfoData sql;
2155  List *targetAttrs = NIL;
2156  List *retrieved_attrs = NIL;
2157  bool doNothing = false;
2158 
2159  /*
2160  * If the foreign table we are about to insert routed rows into is also an
2161  * UPDATE subplan result rel that will be updated later, proceeding with
2162  * the INSERT will result in the later UPDATE incorrectly modifying those
2163  * routed rows, so prevent the INSERT --- it would be nice if we could
2164  * handle this case; but for now, throw an error for safety.
2165  */
2166  if (plan && plan->operation == CMD_UPDATE &&
2167  (resultRelInfo->ri_usesFdwDirectModify ||
2168  resultRelInfo->ri_FdwState))
2169  ereport(ERROR,
2170  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2171  errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2172  RelationGetRelationName(rel))));
2173 
2174  initStringInfo(&sql);
2175 
2176  /* We transmit all columns that are defined in the foreign table. */
2177  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2178  {
2179  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2180 
2181  if (!attr->attisdropped)
2182  targetAttrs = lappend_int(targetAttrs, attnum);
2183  }
2184 
2185  /* Check if we add the ON CONFLICT clause to the remote query. */
2186  if (plan)
2187  {
2188  OnConflictAction onConflictAction = plan->onConflictAction;
2189 
2190  /* We only support DO NOTHING without an inference specification. */
2191  if (onConflictAction == ONCONFLICT_NOTHING)
2192  doNothing = true;
2193  else if (onConflictAction != ONCONFLICT_NONE)
2194  elog(ERROR, "unexpected ON CONFLICT specification: %d",
2195  (int) onConflictAction);
2196  }
2197 
2198  /*
2199  * If the foreign table is a partition that doesn't have a corresponding
2200  * RTE entry, we need to create a new RTE describing the foreign table for
2201  * use by deparseInsertSql and create_foreign_modify() below, after first
2202  * copying the parent's RTE and modifying some fields to describe the
2203  * foreign partition to work on. However, if this is invoked by UPDATE,
2204  * the existing RTE may already correspond to this partition if it is one
2205  * of the UPDATE subplan target rels; in that case, we can just use the
2206  * existing RTE as-is.
2207  */
2208  if (resultRelInfo->ri_RangeTableIndex == 0)
2209  {
2210  ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2211 
2212  rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2213  rte = copyObject(rte);
2214  rte->relid = RelationGetRelid(rel);
2215  rte->relkind = RELKIND_FOREIGN_TABLE;
2216 
2217  /*
2218  * For UPDATE, we must use the RT index of the first subplan target
2219  * rel's RTE, because the core code would have built expressions for
2220  * the partition, such as RETURNING, using that RT index as varno of
2221  * Vars contained in those expressions.
2222  */
2223  if (plan && plan->operation == CMD_UPDATE &&
2224  rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2225  resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2226  else
2227  resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2228  }
2229  else
2230  {
2231  resultRelation = resultRelInfo->ri_RangeTableIndex;
2232  rte = exec_rt_fetch(resultRelation, estate);
2233  }
2234 
2235  /* Construct the SQL command string. */
2236  deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2237  resultRelInfo->ri_WithCheckOptions,
2238  resultRelInfo->ri_returningList,
2239  &retrieved_attrs, &values_end_len);
2240 
2241  /* Construct an execution state. */
2242  fmstate = create_foreign_modify(mtstate->ps.state,
2243  rte,
2244  resultRelInfo,
2245  CMD_INSERT,
2246  NULL,
2247  sql.data,
2248  targetAttrs,
2249  values_end_len,
2250  retrieved_attrs != NIL,
2251  retrieved_attrs);
2252 
2253  /*
2254  * If the given resultRelInfo already has PgFdwModifyState set, it means
2255  * the foreign table is an UPDATE subplan result rel; in which case, store
2256  * the resulting state into the aux_fmstate of the PgFdwModifyState.
2257  */
2258  if (resultRelInfo->ri_FdwState)
2259  {
2260  Assert(plan && plan->operation == CMD_UPDATE);
2261  Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2262  ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2263  }
2264  else
2265  resultRelInfo->ri_FdwState = fmstate;
2266 }
2267 
2268 /*
2269  * postgresEndForeignInsert
2270  * Finish an insert operation on a foreign table
2271  */
2272 static void
2274  ResultRelInfo *resultRelInfo)
2275 {
2276  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2277 
2278  Assert(fmstate != NULL);
2279 
2280  /*
2281  * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2282  * postgresBeginForeignInsert())
2283  */
2284  if (fmstate->aux_fmstate)
2285  fmstate = fmstate->aux_fmstate;
2286 
2287  /* Destroy the execution state */
2288  finish_foreign_modify(fmstate);
2289 }
2290 
2291 /*
2292  * postgresIsForeignRelUpdatable
2293  * Determine whether a foreign table supports INSERT, UPDATE and/or
2294  * DELETE.
2295  */
2296 static int
2298 {
2299  bool updatable;
2300  ForeignTable *table;
2301  ForeignServer *server;
2302  ListCell *lc;
2303 
2304  /*
2305  * By default, all postgres_fdw foreign tables are assumed updatable. This
2306  * can be overridden by a per-server setting, which in turn can be
2307  * overridden by a per-table setting.
2308  */
2309  updatable = true;
2310 
2311  table = GetForeignTable(RelationGetRelid(rel));
2312  server = GetForeignServer(table->serverid);
2313 
2314  foreach(lc, server->options)
2315  {
2316  DefElem *def = (DefElem *) lfirst(lc);
2317 
2318  if (strcmp(def->defname, "updatable") == 0)
2319  updatable = defGetBoolean(def);
2320  }
2321  foreach(lc, table->options)
2322  {
2323  DefElem *def = (DefElem *) lfirst(lc);
2324 
2325  if (strcmp(def->defname, "updatable") == 0)
2326  updatable = defGetBoolean(def);
2327  }
2328 
2329  /*
2330  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2331  */
2332  return updatable ?
2333  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2334 }
2335 
2336 /*
2337  * postgresRecheckForeignScan
2338  * Execute a local join execution plan for a foreign join
2339  */
2340 static bool
2342 {
2343  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2345  TupleTableSlot *result;
2346 
2347  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2348  if (scanrelid > 0)
2349  return true;
2350 
2351  Assert(outerPlan != NULL);
2352 
2353  /* Execute a local join execution plan */
2354  result = ExecProcNode(outerPlan);
2355  if (TupIsNull(result))
2356  return false;
2357 
2358  /* Store result in the given slot */
2359  ExecCopySlot(slot, result);
2360 
2361  return true;
2362 }
2363 
2364 /*
2365  * find_modifytable_subplan
2366  * Helper routine for postgresPlanDirectModify to find the
2367  * ModifyTable subplan node that scans the specified RTI.
2368  *
2369  * Returns NULL if the subplan couldn't be identified. That's not a fatal
2370  * error condition, we just abandon trying to do the update directly.
2371  */
2372 static ForeignScan *
2374  ModifyTable *plan,
2375  Index rtindex,
2376  int subplan_index)
2377 {
2378  Plan *subplan = outerPlan(plan);
2379 
2380  /*
2381  * The cases we support are (1) the desired ForeignScan is the immediate
2382  * child of ModifyTable, or (2) it is the subplan_index'th child of an
2383  * Append node that is the immediate child of ModifyTable. There is no
2384  * point in looking further down, as that would mean that local joins are
2385  * involved, so we can't do the update directly.
2386  *
2387  * There could be a Result atop the Append too, acting to compute the
2388  * UPDATE targetlist values. We ignore that here; the tlist will be
2389  * checked by our caller.
2390  *
2391  * In principle we could examine all the children of the Append, but it's
2392  * currently unlikely that the core planner would generate such a plan
2393  * with the children out-of-order. Moreover, such a search risks costing
2394  * O(N^2) time when there are a lot of children.
2395  */
2396  if (IsA(subplan, Append))
2397  {
2398  Append *appendplan = (Append *) subplan;
2399 
2400  if (subplan_index < list_length(appendplan->appendplans))
2401  subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2402  }
2403  else if (IsA(subplan, Result) &&
2404  outerPlan(subplan) != NULL &&
2405  IsA(outerPlan(subplan), Append))
2406  {
2407  Append *appendplan = (Append *) outerPlan(subplan);
2408 
2409  if (subplan_index < list_length(appendplan->appendplans))
2410  subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2411  }
2412 
2413  /* Now, have we got a ForeignScan on the desired rel? */
2414  if (IsA(subplan, ForeignScan))
2415  {
2416  ForeignScan *fscan = (ForeignScan *) subplan;
2417 
2418  if (bms_is_member(rtindex, fscan->fs_base_relids))
2419  return fscan;
2420  }
2421 
2422  return NULL;
2423 }
2424 
2425 /*
2426  * postgresPlanDirectModify
2427  * Consider a direct foreign table modification
2428  *
2429  * Decide whether it is safe to modify a foreign table directly, and if so,
2430  * rewrite subplan accordingly.
2431  */
2432 static bool
2434  ModifyTable *plan,
2435  Index resultRelation,
2436  int subplan_index)
2437 {
2438  CmdType operation = plan->operation;
2439  RelOptInfo *foreignrel;
2440  RangeTblEntry *rte;
2441  PgFdwRelationInfo *fpinfo;
2442  Relation rel;
2443  StringInfoData sql;
2444  ForeignScan *fscan;
2445  List *processed_tlist = NIL;
2446  List *targetAttrs = NIL;
2447  List *remote_exprs;
2448  List *params_list = NIL;
2449  List *returningList = NIL;
2450  List *retrieved_attrs = NIL;
2451 
2452  /*
2453  * Decide whether it is safe to modify a foreign table directly.
2454  */
2455 
2456  /*
2457  * The table modification must be an UPDATE or DELETE.
2458  */
2459  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2460  return false;
2461 
2462  /*
2463  * Try to locate the ForeignScan subplan that's scanning resultRelation.
2464  */
2465  fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2466  if (!fscan)
2467  return false;
2468 
2469  /*
2470  * It's unsafe to modify a foreign table directly if there are any quals
2471  * that should be evaluated locally.
2472  */
2473  if (fscan->scan.plan.qual != NIL)
2474  return false;
2475 
2476  /* Safe to fetch data about the target foreign rel */
2477  if (fscan->scan.scanrelid == 0)
2478  {
2479  foreignrel = find_join_rel(root, fscan->fs_relids);
2480  /* We should have a rel for this foreign join. */
2481  Assert(foreignrel);
2482  }
2483  else
2484  foreignrel = root->simple_rel_array[resultRelation];
2485  rte = root->simple_rte_array[resultRelation];
2486  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2487 
2488  /*
2489  * It's unsafe to update a foreign table directly, if any expressions to
2490  * assign to the target columns are unsafe to evaluate remotely.
2491  */
2492  if (operation == CMD_UPDATE)
2493  {
2494  ListCell *lc,
2495  *lc2;
2496 
2497  /*
2498  * The expressions of concern are the first N columns of the processed
2499  * targetlist, where N is the length of the rel's update_colnos.
2500  */
2501  get_translated_update_targetlist(root, resultRelation,
2502  &processed_tlist, &targetAttrs);
2503  forboth(lc, processed_tlist, lc2, targetAttrs)
2504  {
2505  TargetEntry *tle = lfirst_node(TargetEntry, lc);
2506  AttrNumber attno = lfirst_int(lc2);
2507 
2508  /* update's new-value expressions shouldn't be resjunk */
2509  Assert(!tle->resjunk);
2510 
2511  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2512  elog(ERROR, "system-column update is not supported");
2513 
2514  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2515  return false;
2516  }
2517  }
2518 
2519  /*
2520  * Ok, rewrite subplan so as to modify the foreign table directly.
2521  */
2522  initStringInfo(&sql);
2523 
2524  /*
2525  * Core code already has some lock on each rel being planned, so we can
2526  * use NoLock here.
2527  */
2528  rel = table_open(rte->relid, NoLock);
2529 
2530  /*
2531  * Recall the qual clauses that must be evaluated remotely. (These are
2532  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2533  * doesn't care.)
2534  */
2535  remote_exprs = fpinfo->final_remote_exprs;
2536 
2537  /*
2538  * Extract the relevant RETURNING list if any.
2539  */
2540  if (plan->returningLists)
2541  {
2542  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2543 
2544  /*
2545  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2546  * we fetch from the foreign server any Vars specified in RETURNING
2547  * that refer not only to the target relation but to non-target
2548  * relations. So we'll deparse them into the RETURNING clause of the
2549  * remote query; use a targetlist consisting of them instead, which
2550  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2551  * node below.
2552  */
2553  if (fscan->scan.scanrelid == 0)
2554  returningList = build_remote_returning(resultRelation, rel,
2555  returningList);
2556  }
2557 
2558  /*
2559  * Construct the SQL command string.
2560  */
2561  switch (operation)
2562  {
2563  case CMD_UPDATE:
2564  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2565  foreignrel,
2566  processed_tlist,
2567  targetAttrs,
2568  remote_exprs, &params_list,
2569  returningList, &retrieved_attrs);
2570  break;
2571  case CMD_DELETE:
2572  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2573  foreignrel,
2574  remote_exprs, &params_list,
2575  returningList, &retrieved_attrs);
2576  break;
2577  default:
2578  elog(ERROR, "unexpected operation: %d", (int) operation);
2579  break;
2580  }
2581 
2582  /*
2583  * Update the operation and target relation info.
2584  */
2585  fscan->operation = operation;
2586  fscan->resultRelation = resultRelation;
2587 
2588  /*
2589  * Update the fdw_exprs list that will be available to the executor.
2590  */
2591  fscan->fdw_exprs = params_list;
2592 
2593  /*
2594  * Update the fdw_private list that will be available to the executor.
2595  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2596  */
2597  fscan->fdw_private = list_make4(makeString(sql.data),
2598  makeBoolean((retrieved_attrs != NIL)),
2599  retrieved_attrs,
2600  makeBoolean(plan->canSetTag));
2601 
2602  /*
2603  * Update the foreign-join-related fields.
2604  */
2605  if (fscan->scan.scanrelid == 0)
2606  {
2607  /* No need for the outer subplan. */
2608  fscan->scan.plan.lefttree = NULL;
2609 
2610  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2611  if (returningList)
2612  rebuild_fdw_scan_tlist(fscan, returningList);
2613  }
2614 
2615  /*
2616  * Finally, unset the async-capable flag if it is set, as we currently
2617  * don't support asynchronous execution of direct modifications.
2618  */
2619  if (fscan->scan.plan.async_capable)
2620  fscan->scan.plan.async_capable = false;
2621 
2622  table_close(rel, NoLock);
2623  return true;
2624 }
2625 
2626 /*
2627  * postgresBeginDirectModify
2628  * Prepare a direct foreign table modification
2629  */
2630 static void
2632 {
2633  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2634  EState *estate = node->ss.ps.state;
2635  PgFdwDirectModifyState *dmstate;
2636  Index rtindex;
2637  Oid userid;
2638  ForeignTable *table;
2639  UserMapping *user;
2640  int numParams;
2641 
2642  /*
2643  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2644  */
2645  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2646  return;
2647 
2648  /*
2649  * We'll save private state in node->fdw_state.
2650  */
2651  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2652  node->fdw_state = (void *) dmstate;
2653 
2654  /*
2655  * Identify which user to do the remote access as. This should match what
2656  * ExecCheckPermissions() does.
2657  */
2658  userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2659 
2660  /* Get info about foreign table. */
2661  rtindex = node->resultRelInfo->ri_RangeTableIndex;
2662  if (fsplan->scan.scanrelid == 0)
2663  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2664  else
2665  dmstate->rel = node->ss.ss_currentRelation;
2666  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2667  user = GetUserMapping(userid, table->serverid);
2668 
2669  /*
2670  * Get connection to the foreign server. Connection manager will
2671  * establish new connection if necessary.
2672  */
2673  dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2674 
2675  /* Update the foreign-join-related fields. */
2676  if (fsplan->scan.scanrelid == 0)
2677  {
2678  /* Save info about foreign table. */
2679  dmstate->resultRel = dmstate->rel;
2680 
2681  /*
2682  * Set dmstate->rel to NULL to teach get_returning_data() and
2683  * make_tuple_from_result_row() that columns fetched from the remote
2684  * server are described by fdw_scan_tlist of the foreign-scan plan
2685  * node, not the tuple descriptor for the target relation.
2686  */
2687  dmstate->rel = NULL;
2688  }
2689 
2690  /* Initialize state variable */
2691  dmstate->num_tuples = -1; /* -1 means not set yet */
2692 
2693  /* Get private info created by planner functions. */
2694  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2696  dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2698  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2700  dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2702 
2703  /* Create context for per-tuple temp workspace. */
2704  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2705  "postgres_fdw temporary data",
2707 
2708  /* Prepare for input conversion of RETURNING results. */
2709  if (dmstate->has_returning)
2710  {
2711  TupleDesc tupdesc;
2712 
2713  if (fsplan->scan.scanrelid == 0)
2714  tupdesc = get_tupdesc_for_join_scan_tuples(node);
2715  else
2716  tupdesc = RelationGetDescr(dmstate->rel);
2717 
2718  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2719 
2720  /*
2721  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2722  * initialize a filter to extract an updated/deleted tuple from a scan
2723  * tuple.
2724  */
2725  if (fsplan->scan.scanrelid == 0)
2726  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2727  }
2728 
2729  /*
2730  * Prepare for processing of parameters used in remote query, if any.
2731  */
2732  numParams = list_length(fsplan->fdw_exprs);
2733  dmstate->numParams = numParams;
2734  if (numParams > 0)
2736  fsplan->fdw_exprs,
2737  numParams,
2738  &dmstate->param_flinfo,
2739  &dmstate->param_exprs,
2740  &dmstate->param_values);
2741 }
2742 
2743 /*
2744  * postgresIterateDirectModify
2745  * Execute a direct foreign table modification
2746  */
2747 static TupleTableSlot *
2749 {
2751  EState *estate = node->ss.ps.state;
2752  ResultRelInfo *resultRelInfo = node->resultRelInfo;
2753 
2754  /*
2755  * If this is the first call after Begin, execute the statement.
2756  */
2757  if (dmstate->num_tuples == -1)
2758  execute_dml_stmt(node);
2759 
2760  /*
2761  * If the local query doesn't specify RETURNING, just clear tuple slot.
2762  */
2763  if (!resultRelInfo->ri_projectReturning)
2764  {
2765  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2766  Instrumentation *instr = node->ss.ps.instrument;
2767 
2768  Assert(!dmstate->has_returning);
2769 
2770  /* Increment the command es_processed count if necessary. */
2771  if (dmstate->set_processed)
2772  estate->es_processed += dmstate->num_tuples;
2773 
2774  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2775  if (instr)
2776  instr->tuplecount += dmstate->num_tuples;
2777 
2778  return ExecClearTuple(slot);
2779  }
2780 
2781  /*
2782  * Get the next RETURNING tuple.
2783  */
2784  return get_returning_data(node);
2785 }
2786 
2787 /*
2788  * postgresEndDirectModify
2789  * Finish a direct foreign table modification
2790  */
2791 static void
2793 {
2795 
2796  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2797  if (dmstate == NULL)
2798  return;
2799 
2800  /* Release PGresult */
2801  PQclear(dmstate->result);
2802 
2803  /* Release remote connection */
2804  ReleaseConnection(dmstate->conn);
2805  dmstate->conn = NULL;
2806 
2807  /* MemoryContext will be deleted automatically. */
2808 }
2809 
2810 /*
2811  * postgresExplainForeignScan
2812  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2813  */
2814 static void
2816 {
2818  List *fdw_private = plan->fdw_private;
2819 
2820  /*
2821  * Identify foreign scans that are really joins or upper relations. The
2822  * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2823  * digit string(s), which are RT indexes, with the correct relation names.
2824  * We do that here, not when the plan is created, because we can't know
2825  * what aliases ruleutils.c will assign at plan creation time.
2826  */
2827  if (list_length(fdw_private) > FdwScanPrivateRelations)
2828  {
2829  StringInfo relations;
2830  char *rawrelations;
2831  char *ptr;
2832  int minrti,
2833  rtoffset;
2834 
2835  rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2836 
2837  /*
2838  * A difficulty with using a string representation of RT indexes is
2839  * that setrefs.c won't update the string when flattening the
2840  * rangetable. To find out what rtoffset was applied, identify the
2841  * minimum RT index appearing in the string and compare it to the
2842  * minimum member of plan->fs_base_relids. (We expect all the relids
2843  * in the join will have been offset by the same amount; the Asserts
2844  * below should catch it if that ever changes.)
2845  */
2846  minrti = INT_MAX;
2847  ptr = rawrelations;
2848  while (*ptr)
2849  {
2850  if (isdigit((unsigned char) *ptr))
2851  {
2852  int rti = strtol(ptr, &ptr, 10);
2853 
2854  if (rti < minrti)
2855  minrti = rti;
2856  }
2857  else
2858  ptr++;
2859  }
2860  rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti;
2861 
2862  /* Now we can translate the string */
2863  relations = makeStringInfo();
2864  ptr = rawrelations;
2865  while (*ptr)
2866  {
2867  if (isdigit((unsigned char) *ptr))
2868  {
2869  int rti = strtol(ptr, &ptr, 10);
2870  RangeTblEntry *rte;
2871  char *relname;
2872  char *refname;
2873 
2874  rti += rtoffset;
2875  Assert(bms_is_member(rti, plan->fs_base_relids));
2876  rte = rt_fetch(rti, es->rtable);
2877  Assert(rte->rtekind == RTE_RELATION);
2878  /* This logic should agree with explain.c's ExplainTargetRel */
2879  relname = get_rel_name(rte->relid);
2880  if (es->verbose)
2881  {
2882  char *namespace;
2883 
2884  namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid));
2885  appendStringInfo(relations, "%s.%s",
2886  quote_identifier(namespace),
2888  }
2889  else
2890  appendStringInfoString(relations,
2892  refname = (char *) list_nth(es->rtable_names, rti - 1);
2893  if (refname == NULL)
2894  refname = rte->eref->aliasname;
2895  if (strcmp(refname, relname) != 0)
2896  appendStringInfo(relations, " %s",
2897  quote_identifier(refname));
2898  }
2899  else
2900  appendStringInfoChar(relations, *ptr++);
2901  }
2902  ExplainPropertyText("Relations", relations->data, es);
2903  }
2904 
2905  /*
2906  * Add remote query, when VERBOSE option is specified.
2907  */
2908  if (es->verbose)
2909  {
2910  char *sql;
2911 
2912  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2913  ExplainPropertyText("Remote SQL", sql, es);
2914  }
2915 }
2916 
2917 /*
2918  * postgresExplainForeignModify
2919  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2920  */
2921 static void
2923  ResultRelInfo *rinfo,
2924  List *fdw_private,
2925  int subplan_index,
2926  ExplainState *es)
2927 {
2928  if (es->verbose)
2929  {
2930  char *sql = strVal(list_nth(fdw_private,
2932 
2933  ExplainPropertyText("Remote SQL", sql, es);
2934 
2935  /*
2936  * For INSERT we should always have batch size >= 1, but UPDATE and
2937  * DELETE don't support batching so don't show the property.
2938  */
2939  if (rinfo->ri_BatchSize > 0)
2940  ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2941  }
2942 }
2943 
2944 /*
2945  * postgresExplainDirectModify
2946  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2947  * foreign table directly
2948  */
2949 static void
2951 {
2952  List *fdw_private;
2953  char *sql;
2954 
2955  if (es->verbose)
2956  {
2957  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2958  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2959  ExplainPropertyText("Remote SQL", sql, es);
2960  }
2961 }
2962 
2963 /*
2964  * postgresExecForeignTruncate
2965  * Truncate one or more foreign tables
2966  */
2967 static void
2969  DropBehavior behavior,
2970  bool restart_seqs)
2971 {
2972  Oid serverid = InvalidOid;
2973  UserMapping *user = NULL;
2974  PGconn *conn = NULL;
2975  StringInfoData sql;
2976  ListCell *lc;
2977  bool server_truncatable = true;
2978 
2979  /*
2980  * By default, all postgres_fdw foreign tables are assumed truncatable.
2981  * This can be overridden by a per-server setting, which in turn can be
2982  * overridden by a per-table setting.
2983  */
2984  foreach(lc, rels)
2985  {
2986  ForeignServer *server = NULL;
2987  Relation rel = lfirst(lc);
2989  ListCell *cell;
2990  bool truncatable;
2991 
2992  /*
2993  * First time through, determine whether the foreign server allows
2994  * truncates. Since all specified foreign tables are assumed to belong
2995  * to the same foreign server, this result can be used for other
2996  * foreign tables.
2997  */
2998  if (!OidIsValid(serverid))
2999  {
3000  serverid = table->serverid;
3001  server = GetForeignServer(serverid);
3002 
3003  foreach(cell, server->options)
3004  {
3005  DefElem *defel = (DefElem *) lfirst(cell);
3006 
3007  if (strcmp(defel->defname, "truncatable") == 0)
3008  {
3009  server_truncatable = defGetBoolean(defel);
3010  break;
3011  }
3012  }
3013  }
3014 
3015  /*
3016  * Confirm that all specified foreign tables belong to the same
3017  * foreign server.
3018  */
3019  Assert(table->serverid == serverid);
3020 
3021  /* Determine whether this foreign table allows truncations */
3022  truncatable = server_truncatable;
3023  foreach(cell, table->options)
3024  {
3025  DefElem *defel = (DefElem *) lfirst(cell);
3026 
3027  if (strcmp(defel->defname, "truncatable") == 0)
3028  {
3029  truncatable = defGetBoolean(defel);
3030  break;
3031  }
3032  }
3033 
3034  if (!truncatable)
3035  ereport(ERROR,
3036  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3037  errmsg("foreign table \"%s\" does not allow truncates",
3038  RelationGetRelationName(rel))));
3039  }
3040  Assert(OidIsValid(serverid));
3041 
3042  /*
3043  * Get connection to the foreign server. Connection manager will
3044  * establish new connection if necessary.
3045  */
3046  user = GetUserMapping(GetUserId(), serverid);
3047  conn = GetConnection(user, false, NULL);
3048 
3049  /* Construct the TRUNCATE command string */
3050  initStringInfo(&sql);
3051  deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3052 
3053  /* Issue the TRUNCATE command to remote server */
3054  do_sql_command(conn, sql.data);
3055 
3056  pfree(sql.data);
3057 }
3058 
3059 /*
3060  * estimate_path_cost_size
3061  * Get cost and size estimates for a foreign scan on given foreign relation
3062  * either a base relation or a join between foreign relations or an upper
3063  * relation containing foreign relations.
3064  *
3065  * param_join_conds are the parameterization clauses with outer relations.
3066  * pathkeys specify the expected sort order if any for given path being costed.
3067  * fpextra specifies additional post-scan/join-processing steps such as the
3068  * final sort and the LIMIT restriction.
3069  *
3070  * The function returns the cost and size estimates in p_rows, p_width,
3071  * p_startup_cost and p_total_cost variables.
3072  */
3073 static void
3075  RelOptInfo *foreignrel,
3076  List *param_join_conds,
3077  List *pathkeys,
3078  PgFdwPathExtraData *fpextra,
3079  double *p_rows, int *p_width,
3080  Cost *p_startup_cost, Cost *p_total_cost)
3081 {
3082  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3083  double rows;
3084  double retrieved_rows;
3085  int width;
3086  Cost startup_cost;
3087  Cost total_cost;
3088 
3089  /* Make sure the core code has set up the relation's reltarget */
3090  Assert(foreignrel->reltarget);
3091 
3092  /*
3093  * If the table or the server is configured to use remote estimates,
3094  * connect to the foreign server and execute EXPLAIN to estimate the
3095  * number of rows selected by the restriction+join clauses. Otherwise,
3096  * estimate rows using whatever statistics we have locally, in a way
3097  * similar to ordinary tables.
3098  */
3099  if (fpinfo->use_remote_estimate)
3100  {
3101  List *remote_param_join_conds;
3102  List *local_param_join_conds;
3103  StringInfoData sql;
3104  PGconn *conn;
3105  Selectivity local_sel;
3106  QualCost local_cost;
3107  List *fdw_scan_tlist = NIL;
3108  List *remote_conds;
3109 
3110  /* Required only to be passed to deparseSelectStmtForRel */
3111  List *retrieved_attrs;
3112 
3113  /*
3114  * param_join_conds might contain both clauses that are safe to send
3115  * across, and clauses that aren't.
3116  */
3117  classifyConditions(root, foreignrel, param_join_conds,
3118  &remote_param_join_conds, &local_param_join_conds);
3119 
3120  /* Build the list of columns to be fetched from the foreign server. */
3121  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3122  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3123  else
3124  fdw_scan_tlist = NIL;
3125 
3126  /*
3127  * The complete list of remote conditions includes everything from
3128  * baserestrictinfo plus any extra join_conds relevant to this
3129  * particular path.
3130  */
3131  remote_conds = list_concat(remote_param_join_conds,
3132  fpinfo->remote_conds);
3133 
3134  /*
3135  * Construct EXPLAIN query including the desired SELECT, FROM, and
3136  * WHERE clauses. Params and other-relation Vars are replaced by dummy
3137  * values, so don't request params_list.
3138  */
3139  initStringInfo(&sql);
3140  appendStringInfoString(&sql, "EXPLAIN ");
3141  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3142  remote_conds, pathkeys,
3143  fpextra ? fpextra->has_final_sort : false,
3144  fpextra ? fpextra->has_limit : false,
3145  false, &retrieved_attrs, NULL);
3146 
3147  /* Get the remote estimate */
3148  conn = GetConnection(fpinfo->user, false, NULL);
3149  get_remote_estimate(sql.data, conn, &rows, &width,
3150  &startup_cost, &total_cost);
3152 
3153  retrieved_rows = rows;
3154 
3155  /* Factor in the selectivity of the locally-checked quals */
3156  local_sel = clauselist_selectivity(root,
3157  local_param_join_conds,
3158  foreignrel->relid,
3159  JOIN_INNER,
3160  NULL);
3161  local_sel *= fpinfo->local_conds_sel;
3162 
3163  rows = clamp_row_est(rows * local_sel);
3164 
3165  /* Add in the eval cost of the locally-checked quals */
3166  startup_cost += fpinfo->local_conds_cost.startup;
3167  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3168  cost_qual_eval(&local_cost, local_param_join_conds, root);
3169  startup_cost += local_cost.startup;
3170  total_cost += local_cost.per_tuple * retrieved_rows;
3171 
3172  /*
3173  * Add in tlist eval cost for each output row. In case of an
3174  * aggregate, some of the tlist expressions such as grouping
3175  * expressions will be evaluated remotely, so adjust the costs.
3176  */
3177  startup_cost += foreignrel->reltarget->cost.startup;
3178  total_cost += foreignrel->reltarget->cost.startup;
3179  total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3180  if (IS_UPPER_REL(foreignrel))
3181  {
3182  QualCost tlist_cost;
3183 
3184  cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3185  startup_cost -= tlist_cost.startup;
3186  total_cost -= tlist_cost.startup;
3187  total_cost -= tlist_cost.per_tuple * rows;
3188  }
3189  }
3190  else
3191  {
3192  Cost run_cost = 0;
3193 
3194  /*
3195  * We don't support join conditions in this mode (hence, no
3196  * parameterized paths can be made).
3197  */
3198  Assert(param_join_conds == NIL);
3199 
3200  /*
3201  * We will come here again and again with different set of pathkeys or
3202  * additional post-scan/join-processing steps that caller wants to
3203  * cost. We don't need to calculate the cost/size estimates for the
3204  * underlying scan, join, or grouping each time. Instead, use those
3205  * estimates if we have cached them already.
3206  */
3207  if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3208  {
3209  Assert(fpinfo->retrieved_rows >= 0);
3210 
3211  rows = fpinfo->rows;
3212  retrieved_rows = fpinfo->retrieved_rows;
3213  width = fpinfo->width;
3214  startup_cost = fpinfo->rel_startup_cost;
3215  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3216 
3217  /*
3218  * If we estimate the costs of a foreign scan or a foreign join
3219  * with additional post-scan/join-processing steps, the scan or
3220  * join costs obtained from the cache wouldn't yet contain the
3221  * eval costs for the final scan/join target, which would've been
3222  * updated by apply_scanjoin_target_to_paths(); add the eval costs
3223  * now.
3224  */
3225  if (fpextra && !IS_UPPER_REL(foreignrel))
3226  {
3227  /* Shouldn't get here unless we have LIMIT */
3228  Assert(fpextra->has_limit);
3229  Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3230  foreignrel->reloptkind == RELOPT_JOINREL);
3231  startup_cost += foreignrel->reltarget->cost.startup;
3232  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3233  }
3234  }
3235  else if (IS_JOIN_REL(foreignrel))
3236  {
3237  PgFdwRelationInfo *fpinfo_i;
3238  PgFdwRelationInfo *fpinfo_o;
3239  QualCost join_cost;
3240  QualCost remote_conds_cost;
3241  double nrows;
3242 
3243  /* Use rows/width estimates made by the core code. */
3244  rows = foreignrel->rows;
3245  width = foreignrel->reltarget->width;
3246 
3247  /* For join we expect inner and outer relations set */
3248  Assert(fpinfo->innerrel && fpinfo->outerrel);
3249 
3250  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3251  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3252 
3253  /* Estimate of number of rows in cross product */
3254  nrows = fpinfo_i->rows * fpinfo_o->rows;
3255 
3256  /*
3257  * Back into an estimate of the number of retrieved rows. Just in
3258  * case this is nuts, clamp to at most nrows.
3259  */
3260  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3261  retrieved_rows = Min(retrieved_rows, nrows);
3262 
3263  /*
3264  * The cost of foreign join is estimated as cost of generating
3265  * rows for the joining relations + cost for applying quals on the
3266  * rows.
3267  */
3268 
3269  /*
3270  * Calculate the cost of clauses pushed down to the foreign server
3271  */
3272  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3273  /* Calculate the cost of applying join clauses */
3274  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3275 
3276  /*
3277  * Startup cost includes startup cost of joining relations and the
3278  * startup cost for join and other clauses. We do not include the
3279  * startup cost specific to join strategy (e.g. setting up hash
3280  * tables) since we do not know what strategy the foreign server
3281  * is going to use.
3282  */
3283  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3284  startup_cost += join_cost.startup;
3285  startup_cost += remote_conds_cost.startup;
3286  startup_cost += fpinfo->local_conds_cost.startup;
3287 
3288  /*
3289  * Run time cost includes:
3290  *
3291  * 1. Run time cost (total_cost - startup_cost) of relations being
3292  * joined
3293  *
3294  * 2. Run time cost of applying join clauses on the cross product
3295  * of the joining relations.
3296  *
3297  * 3. Run time cost of applying pushed down other clauses on the
3298  * result of join
3299  *
3300  * 4. Run time cost of applying nonpushable other clauses locally
3301  * on the result fetched from the foreign server.
3302  */
3303  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3304  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3305  run_cost += nrows * join_cost.per_tuple;
3306  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3307  run_cost += nrows * remote_conds_cost.per_tuple;
3308  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3309 
3310  /* Add in tlist eval cost for each output row */
3311  startup_cost += foreignrel->reltarget->cost.startup;
3312  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3313  }
3314  else if (IS_UPPER_REL(foreignrel))
3315  {
3316  RelOptInfo *outerrel = fpinfo->outerrel;
3317  PgFdwRelationInfo *ofpinfo;
3318  AggClauseCosts aggcosts;
3319  double input_rows;
3320  int numGroupCols;
3321  double numGroups = 1;
3322 
3323  /* The upper relation should have its outer relation set */
3324  Assert(outerrel);
3325  /* and that outer relation should have its reltarget set */
3326  Assert(outerrel->reltarget);
3327 
3328  /*
3329  * This cost model is mixture of costing done for sorted and
3330  * hashed aggregates in cost_agg(). We are not sure which
3331  * strategy will be considered at remote side, thus for
3332  * simplicity, we put all startup related costs in startup_cost
3333  * and all finalization and run cost are added in total_cost.
3334  */
3335 
3336  ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3337 
3338  /* Get rows from input rel */
3339  input_rows = ofpinfo->rows;
3340 
3341  /* Collect statistics about aggregates for estimating costs. */
3342  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3343  if (root->parse->hasAggs)
3344  {
3345  get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts);
3346  }
3347 
3348  /* Get number of grouping columns and possible number of groups */
3349  numGroupCols = list_length(root->processed_groupClause);
3350  numGroups = estimate_num_groups(root,
3352  fpinfo->grouped_tlist),
3353  input_rows, NULL, NULL);
3354 
3355  /*
3356  * Get the retrieved_rows and rows estimates. If there are HAVING
3357  * quals, account for their selectivity.
3358  */
3359  if (root->hasHavingQual)
3360  {
3361  /* Factor in the selectivity of the remotely-checked quals */
3362  retrieved_rows =
3363  clamp_row_est(numGroups *
3365  fpinfo->remote_conds,
3366  0,
3367  JOIN_INNER,
3368  NULL));
3369  /* Factor in the selectivity of the locally-checked quals */
3370  rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3371  }
3372  else
3373  {
3374  rows = retrieved_rows = numGroups;
3375  }
3376 
3377  /* Use width estimate made by the core code. */
3378  width = foreignrel->reltarget->width;
3379 
3380  /*-----
3381  * Startup cost includes:
3382  * 1. Startup cost for underneath input relation, adjusted for
3383  * tlist replacement by apply_scanjoin_target_to_paths()
3384  * 2. Cost of performing aggregation, per cost_agg()
3385  *-----
3386  */
3387  startup_cost = ofpinfo->rel_startup_cost;
3388  startup_cost += outerrel->reltarget->cost.startup;
3389  startup_cost += aggcosts.transCost.startup;
3390  startup_cost += aggcosts.transCost.per_tuple * input_rows;
3391  startup_cost += aggcosts.finalCost.startup;
3392  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3393 
3394  /*-----
3395  * Run time cost includes:
3396  * 1. Run time cost of underneath input relation, adjusted for
3397  * tlist replacement by apply_scanjoin_target_to_paths()
3398  * 2. Run time cost of performing aggregation, per cost_agg()
3399  *-----
3400  */
3401  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3402  run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3403  run_cost += aggcosts.finalCost.per_tuple * numGroups;
3404  run_cost += cpu_tuple_cost * numGroups;
3405 
3406  /* Account for the eval cost of HAVING quals, if any */
3407  if (root->hasHavingQual)
3408  {
3409  QualCost remote_cost;
3410 
3411  /* Add in the eval cost of the remotely-checked quals */
3412  cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3413  startup_cost += remote_cost.startup;
3414  run_cost += remote_cost.per_tuple * numGroups;
3415  /* Add in the eval cost of the locally-checked quals */
3416  startup_cost += fpinfo->local_conds_cost.startup;
3417  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3418  }
3419 
3420  /* Add in tlist eval cost for each output row */
3421  startup_cost += foreignrel->reltarget->cost.startup;
3422  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3423  }
3424  else
3425  {
3426  Cost cpu_per_tuple;
3427 
3428  /* Use rows/width estimates made by set_baserel_size_estimates. */
3429  rows = foreignrel->rows;
3430  width = foreignrel->reltarget->width;
3431 
3432  /*
3433  * Back into an estimate of the number of retrieved rows. Just in
3434  * case this is nuts, clamp to at most foreignrel->tuples.
3435  */
3436  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3437  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3438 
3439  /*
3440  * Cost as though this were a seqscan, which is pessimistic. We
3441  * effectively imagine the local_conds are being evaluated
3442  * remotely, too.
3443  */
3444  startup_cost = 0;
3445  run_cost = 0;
3446  run_cost += seq_page_cost * foreignrel->pages;
3447 
3448  startup_cost += foreignrel->baserestrictcost.startup;
3449  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3450  run_cost += cpu_per_tuple * foreignrel->tuples;
3451 
3452  /* Add in tlist eval cost for each output row */
3453  startup_cost += foreignrel->reltarget->cost.startup;
3454  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3455  }
3456 
3457  /*
3458  * Without remote estimates, we have no real way to estimate the cost
3459  * of generating sorted output. It could be free if the query plan
3460  * the remote side would have chosen generates properly-sorted output
3461  * anyway, but in most cases it will cost something. Estimate a value
3462  * high enough that we won't pick the sorted path when the ordering
3463  * isn't locally useful, but low enough that we'll err on the side of
3464  * pushing down the ORDER BY clause when it's useful to do so.
3465  */
3466  if (pathkeys != NIL)
3467  {
3468  if (IS_UPPER_REL(foreignrel))
3469  {
3470  Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3471  fpinfo->stage == UPPERREL_GROUP_AGG);
3472  adjust_foreign_grouping_path_cost(root, pathkeys,
3473  retrieved_rows, width,
3474  fpextra->limit_tuples,
3475  &startup_cost, &run_cost);
3476  }
3477  else
3478  {
3479  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3480  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3481  }
3482  }
3483 
3484  total_cost = startup_cost + run_cost;
3485 
3486  /* Adjust the cost estimates if we have LIMIT */
3487  if (fpextra && fpextra->has_limit)
3488  {
3489  adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3490  fpextra->offset_est, fpextra->count_est);
3491  retrieved_rows = rows;
3492  }
3493  }
3494 
3495  /*
3496  * If this includes the final sort step, the given target, which will be
3497  * applied to the resulting path, might have different expressions from
3498  * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3499  * eval costs.
3500  */
3501  if (fpextra && fpextra->has_final_sort &&
3502  fpextra->target != foreignrel->reltarget)
3503  {
3504  QualCost oldcost = foreignrel->reltarget->cost;
3505  QualCost newcost = fpextra->target->cost;
3506 
3507  startup_cost += newcost.startup - oldcost.startup;
3508  total_cost += newcost.startup - oldcost.startup;
3509  total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3510  }
3511 
3512  /*
3513  * Cache the retrieved rows and cost estimates for scans, joins, or
3514  * groupings without any parameterization, pathkeys, or additional
3515  * post-scan/join-processing steps, before adding the costs for
3516  * transferring data from the foreign server. These estimates are useful
3517  * for costing remote joins involving this relation or costing other
3518  * remote operations on this relation such as remote sorts and remote
3519  * LIMIT restrictions, when the costs can not be obtained from the foreign
3520  * server. This function will be called at least once for every foreign
3521  * relation without any parameterization, pathkeys, or additional
3522  * post-scan/join-processing steps.
3523  */
3524  if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3525  {
3526  fpinfo->retrieved_rows = retrieved_rows;
3527  fpinfo->rel_startup_cost = startup_cost;
3528  fpinfo->rel_total_cost = total_cost;
3529  }
3530 
3531  /*
3532  * Add some additional cost factors to account for connection overhead
3533  * (fdw_startup_cost), transferring data across the network
3534  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3535  * (cpu_tuple_cost per retrieved row).
3536  */
3537  startup_cost += fpinfo->fdw_startup_cost;
3538  total_cost += fpinfo->fdw_startup_cost;
3539  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3540  total_cost += cpu_tuple_cost * retrieved_rows;
3541 
3542  /*
3543  * If we have LIMIT, we should prefer performing the restriction remotely
3544  * rather than locally, as the former avoids extra row fetches from the
3545  * remote that the latter might cause. But since the core code doesn't
3546  * account for such fetches when estimating the costs of the local
3547  * restriction (see create_limit_path()), there would be no difference
3548  * between the costs of the local restriction and the costs of the remote
3549  * restriction estimated above if we don't use remote estimates (except
3550  * for the case where the foreignrel is a grouping relation, the given
3551  * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3552  * accounted for in costing the remote restriction). Tweak the costs of
3553  * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3554  * one.
3555  */
3556  if (!fpinfo->use_remote_estimate &&
3557  fpextra && fpextra->has_limit &&
3558  fpextra->limit_tuples > 0 &&
3559  fpextra->limit_tuples < fpinfo->rows)
3560  {
3561  Assert(fpinfo->rows > 0);
3562  total_cost -= (total_cost - startup_cost) * 0.05 *
3563  (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3564  }
3565 
3566  /* Return results. */
3567  *p_rows = rows;
3568  *p_width = width;
3569  *p_startup_cost = startup_cost;
3570  *p_total_cost = total_cost;
3571 }
3572 
3573 /*
3574  * Estimate costs of executing a SQL statement remotely.
3575  * The given "sql" must be an EXPLAIN command.
3576  */
3577 static void
3578 get_remote_estimate(const char *sql, PGconn *conn,
3579  double *rows, int *width,
3580  Cost *startup_cost, Cost *total_cost)
3581 {
3582  PGresult *volatile res = NULL;
3583 
3584  /* PGresult must be released before leaving this function. */
3585  PG_TRY();
3586  {
3587  char *line;
3588  char *p;
3589  int n;
3590 
3591  /*
3592  * Execute EXPLAIN remotely.
3593  */
3594  res = pgfdw_exec_query(conn, sql, NULL);
3596  pgfdw_report_error(ERROR, res, conn, false, sql);
3597 
3598  /*
3599  * Extract cost numbers for topmost plan node. Note we search for a
3600  * left paren from the end of the line to avoid being confused by
3601  * other uses of parentheses.
3602  */
3603  line = PQgetvalue(res, 0, 0);
3604  p = strrchr(line, '(');
3605  if (p == NULL)
3606  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3607  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3608  startup_cost, total_cost, rows, width);
3609  if (n != 4)
3610  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3611  }
3612  PG_FINALLY();
3613  {
3614  PQclear(res);
3615  }
3616  PG_END_TRY();
3617 }
3618 
3619 /*
3620  * Adjust the cost estimates of a foreign grouping path to include the cost of
3621  * generating properly-sorted output.
3622  */
3623 static void
3625  List *pathkeys,
3626  double retrieved_rows,
3627  double width,
3628  double limit_tuples,
3629  Cost *p_startup_cost,
3630  Cost *p_run_cost)
3631 {
3632  /*
3633  * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3634  * side is unlikely to generate properly-sorted output, so it would need
3635  * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3636  * if the GROUP BY clause is sort-able but isn't a superset of the given
3637  * pathkeys, adjust the costs with that function. Otherwise, adjust the
3638  * costs by applying the same heuristic as for the scan or join case.
3639  */
3641  !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3642  {
3643  Path sort_path; /* dummy for result of cost_sort */
3644 
3645  cost_sort(&sort_path,
3646  root,
3647  pathkeys,
3648  *p_startup_cost + *p_run_cost,
3649  retrieved_rows,
3650  width,
3651  0.0,
3652  work_mem,
3653  limit_tuples);
3654 
3655  *p_startup_cost = sort_path.startup_cost;
3656  *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3657  }
3658  else
3659  {
3660  /*
3661  * The default extra cost seems too large for foreign-grouping cases;
3662  * add 1/4th of that default.
3663  */
3664  double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3665  - 1.0) * 0.25;
3666 
3667  *p_startup_cost *= sort_multiplier;
3668  *p_run_cost *= sort_multiplier;
3669  }
3670 }
3671 
3672 /*
3673  * Detect whether we want to process an EquivalenceClass member.
3674  *
3675  * This is a callback for use by generate_implied_equalities_for_column.
3676  */
3677 static bool
3680  void *arg)
3681 {
3683  Expr *expr = em->em_expr;
3684 
3685  /*
3686  * If we've identified what we're processing in the current scan, we only
3687  * want to match that expression.
3688  */
3689  if (state->current != NULL)
3690  return equal(expr, state->current);
3691 
3692  /*
3693  * Otherwise, ignore anything we've already processed.
3694  */
3695  if (list_member(state->already_used, expr))
3696  return false;
3697 
3698  /* This is the new target to process. */
3699  state->current = expr;
3700  return true;
3701 }
3702 
3703 /*
3704  * Create cursor for node's query with current parameter values.
3705  */
3706 static void
3708 {
3709  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3710  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3711  int numParams = fsstate->numParams;
3712  const char **values = fsstate->param_values;
3713  PGconn *conn = fsstate->conn;
3715  PGresult *res;
3716 
3717  /* First, process a pending asynchronous request, if any. */
3718  if (fsstate->conn_state->pendingAreq)
3720 
3721  /*
3722  * Construct array of query parameter values in text format. We do the
3723  * conversions in the short-lived per-tuple context, so as not to cause a
3724  * memory leak over repeated scans.
3725  */
3726  if (numParams > 0)
3727  {
3728  MemoryContext oldcontext;
3729 
3730  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3731 
3732  process_query_params(econtext,
3733  fsstate->param_flinfo,
3734  fsstate->param_exprs,
3735  values);
3736 
3737  MemoryContextSwitchTo(oldcontext);
3738  }
3739 
3740  /* Construct the DECLARE CURSOR command */
3741  initStringInfo(&buf);
3742  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3743  fsstate->cursor_number, fsstate->query);
3744 
3745  /*
3746  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3747  * to infer types for all parameters. Since we explicitly cast every
3748  * parameter (see deparse.c), the "inference" is trivial and will produce
3749  * the desired result. This allows us to avoid assuming that the remote
3750  * server has the same OIDs we do for the parameters' types.
3751  */
3752  if (!PQsendQueryParams(conn, buf.data, numParams,
3753  NULL, values, NULL, NULL, 0))
3754  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3755 
3756  /*
3757  * Get the result, and check for success.
3758  *
3759  * We don't use a PG_TRY block here, so be careful not to throw error
3760  * without releasing the PGresult.
3761  */
3762  res = pgfdw_get_result(conn, buf.data);
3764  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3765  PQclear(res);
3766 
3767  /* Mark the cursor as created, and show no tuples have been retrieved */
3768  fsstate->cursor_exists = true;
3769  fsstate->tuples = NULL;
3770  fsstate->num_tuples = 0;
3771  fsstate->next_tuple = 0;
3772  fsstate->fetch_ct_2 = 0;
3773  fsstate->eof_reached = false;
3774 
3775  /* Clean up */
3776  pfree(buf.data);
3777 }
3778 
3779 /*
3780  * Fetch some more rows from the node's cursor.
3781  */
3782 static void
3784 {
3785  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3786  PGresult *volatile res = NULL;
3787  MemoryContext oldcontext;
3788 
3789  /*
3790  * We'll store the tuples in the batch_cxt. First, flush the previous
3791  * batch.
3792  */
3793  fsstate->tuples = NULL;
3794  MemoryContextReset(fsstate->batch_cxt);
3795  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3796 
3797  /* PGresult must be released before leaving this function. */
3798  PG_TRY();
3799  {
3800  PGconn *conn = fsstate->conn;
3801  int numrows;
3802  int i;
3803 
3804  if (fsstate->async_capable)
3805  {
3806  Assert(fsstate->conn_state->pendingAreq);
3807 
3808  /*
3809  * The query was already sent by an earlier call to
3810  * fetch_more_data_begin. So now we just fetch the result.
3811  */
3812  res = pgfdw_get_result(conn, fsstate->query);
3813  /* On error, report the original query, not the FETCH. */
3815  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3816 
3817  /* Reset per-connection state */
3818  fsstate->conn_state->pendingAreq = NULL;
3819  }
3820  else
3821  {
3822  char sql[64];
3823 
3824  /* This is a regular synchronous fetch. */
3825  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3826  fsstate->fetch_size, fsstate->cursor_number);
3827 
3828  res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3829  /* On error, report the original query, not the FETCH. */
3831  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3832  }
3833 
3834  /* Convert the data into HeapTuples */
3835  numrows = PQntuples(res);
3836  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3837  fsstate->num_tuples = numrows;
3838  fsstate->next_tuple = 0;
3839 
3840  for (i = 0; i < numrows; i++)
3841  {
3842  Assert(IsA(node->ss.ps.plan, ForeignScan));
3843 
3844  fsstate->tuples[i] =
3846  fsstate->rel,
3847  fsstate->attinmeta,
3848  fsstate->retrieved_attrs,
3849  node,
3850  fsstate->temp_cxt);
3851  }
3852 
3853  /* Update fetch_ct_2 */
3854  if (fsstate->fetch_ct_2 < 2)
3855  fsstate->fetch_ct_2++;
3856 
3857  /* Must be EOF if we didn't get as many tuples as we asked for. */
3858  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3859  }
3860  PG_FINALLY();
3861  {
3862  PQclear(res);
3863  }
3864  PG_END_TRY();
3865 
3866  MemoryContextSwitchTo(oldcontext);
3867 }
3868 
3869 /*
3870  * Force assorted GUC parameters to settings that ensure that we'll output
3871  * data values in a form that is unambiguous to the remote server.
3872  *
3873  * This is rather expensive and annoying to do once per row, but there's
3874  * little choice if we want to be sure values are transmitted accurately;
3875  * we can't leave the settings in place between rows for fear of affecting
3876  * user-visible computations.
3877  *
3878  * We use the equivalent of a function SET option to allow the settings to
3879  * persist only until the caller calls reset_transmission_modes(). If an
3880  * error is thrown in between, guc.c will take care of undoing the settings.
3881  *
3882  * The return value is the nestlevel that must be passed to
3883  * reset_transmission_modes() to undo things.
3884  */
3885 int
3887 {
3888  int nestlevel = NewGUCNestLevel();
3889 
3890  /*
3891  * The values set here should match what pg_dump does. See also
3892  * configure_remote_session in connection.c.
3893  */
3894  if (DateStyle != USE_ISO_DATES)
3895  (void) set_config_option("datestyle", "ISO",
3897  GUC_ACTION_SAVE, true, 0, false);
3899  (void) set_config_option("intervalstyle", "postgres",
3901  GUC_ACTION_SAVE, true, 0, false);
3902  if (extra_float_digits < 3)
3903  (void) set_config_option("extra_float_digits", "3",
3905  GUC_ACTION_SAVE, true, 0, false);
3906 
3907  /*
3908  * In addition force restrictive search_path, in case there are any
3909  * regproc or similar constants to be printed.
3910  */
3911  (void) set_config_option("search_path", "pg_catalog",
3913  GUC_ACTION_SAVE, true, 0, false);
3914 
3915  return nestlevel;
3916 }
3917 
3918 /*
3919  * Undo the effects of set_transmission_modes().
3920  */
3921 void
3923 {
3924  AtEOXact_GUC(true, nestlevel);
3925 }
3926 
3927 /*
3928  * Utility routine to close a cursor.
3929  */
3930 static void
3932  PgFdwConnState *conn_state)
3933 {
3934  char sql[64];
3935  PGresult *res;
3936 
3937  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3938 
3939  /*
3940  * We don't use a PG_TRY block here, so be careful not to throw error
3941  * without releasing the PGresult.
3942  */
3943  res = pgfdw_exec_query(conn, sql, conn_state);
3945  pgfdw_report_error(ERROR, res, conn, true, sql);
3946  PQclear(res);
3947 }
3948 
3949 /*
3950  * create_foreign_modify
3951  * Construct an execution state of a foreign insert/update/delete
3952  * operation
3953  */
3954 static PgFdwModifyState *
3956  RangeTblEntry *rte,
3957  ResultRelInfo *resultRelInfo,
3958  CmdType operation,
3959  Plan *subplan,
3960  char *query,
3961  List *target_attrs,
3962  int values_end,
3963  bool has_returning,
3964  List *retrieved_attrs)
3965 {
3966  PgFdwModifyState *fmstate;
3967  Relation rel = resultRelInfo->ri_RelationDesc;
3968  TupleDesc tupdesc = RelationGetDescr(rel);
3969  Oid userid;
3970  ForeignTable *table;
3971  UserMapping *user;
3972  AttrNumber n_params;
3973  Oid typefnoid;
3974  bool isvarlena;
3975  ListCell *lc;
3976 
3977  /* Begin constructing PgFdwModifyState. */
3978  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3979  fmstate->rel = rel;
3980 
3981  /* Identify which user to do the remote access as. */
3982  userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate);
3983 
3984  /* Get info about foreign table. */
3985  table = GetForeignTable(RelationGetRelid(rel));
3986  user = GetUserMapping(userid, table->serverid);
3987 
3988  /* Open connection; report that we'll create a prepared statement. */
3989  fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
3990  fmstate->p_name = NULL; /* prepared statement not made yet */
3991 
3992  /* Set up remote query information. */
3993  fmstate->query = query;
3994  if (operation == CMD_INSERT)
3995  {
3996  fmstate->query = pstrdup(fmstate->query);
3997  fmstate->orig_query = pstrdup(fmstate->query);
3998  }
3999  fmstate->target_attrs = target_attrs;
4000  fmstate->values_end = values_end;
4001  fmstate->has_returning = has_returning;
4002  fmstate->retrieved_attrs = retrieved_attrs;
4003 
4004  /* Create context for per-tuple temp workspace. */
4005  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4006  "postgres_fdw temporary data",
4008 
4009  /* Prepare for input conversion of RETURNING results. */
4010  if (fmstate->has_returning)
4011  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4012 
4013  /* Prepare for output conversion of parameters used in prepared stmt. */
4014  n_params = list_length(fmstate->target_attrs) + 1;
4015  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
4016  fmstate->p_nums = 0;
4017 
4018  if (operation == CMD_UPDATE || operation == CMD_DELETE)
4019  {
4020  Assert(subplan != NULL);
4021 
4022  /* Find the ctid resjunk column in the subplan's result */
4024  "ctid");
4025  if (!AttributeNumberIsValid(fmstate->ctidAttno))
4026  elog(ERROR, "could not find junk ctid column");
4027 
4028  /* First transmittable parameter will be ctid */
4029  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4030  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4031  fmstate->p_nums++;
4032  }
4033 
4034  if (operation == CMD_INSERT || operation == CMD_UPDATE)
4035  {
4036  /* Set up for remaining transmittable parameters */
4037  foreach(lc, fmstate->target_attrs)
4038  {
4039  int attnum = lfirst_int(lc);
4040  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4041 
4042  Assert(!attr->attisdropped);
4043 
4044  /* Ignore generated columns; they are set to DEFAULT */
4045  if (attr->attgenerated)
4046  continue;
4047  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4048  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4049  fmstate->p_nums++;
4050  }
4051  }
4052 
4053  Assert(fmstate->p_nums <= n_params);
4054 
4055  /* Set batch_size from foreign server/table options. */
4056  if (operation == CMD_INSERT)
4057  fmstate->batch_size = get_batch_size_option(rel);
4058 
4059  fmstate->num_slots = 1;
4060 
4061  /* Initialize auxiliary state */
4062  fmstate->aux_fmstate = NULL;
4063 
4064  return fmstate;
4065 }
4066 
4067 /*
4068  * execute_foreign_modify
4069  * Perform foreign-table modification as required, and fetch RETURNING
4070  * result if any. (This is the shared guts of postgresExecForeignInsert,
4071  * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4072  * postgresExecForeignDelete.)
4073  */
4074 static TupleTableSlot **
4076  ResultRelInfo *resultRelInfo,
4077  CmdType operation,
4078  TupleTableSlot **slots,
4079  TupleTableSlot **planSlots,
4080  int *numSlots)
4081 {
4082  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4083  ItemPointer ctid = NULL;
4084  const char **p_values;
4085  PGresult *res;
4086  int n_rows;
4087  StringInfoData sql;
4088 
4089  /* The operation should be INSERT, UPDATE, or DELETE */
4090  Assert(operation == CMD_INSERT ||
4091  operation == CMD_UPDATE ||
4092  operation == CMD_DELETE);
4093 
4094  /* First, process a pending asynchronous request, if any. */
4095  if (fmstate->conn_state->pendingAreq)
4097 
4098  /*
4099  * If the existing query was deparsed and prepared for a different number
4100  * of rows, rebuild it for the proper number.
4101  */
4102  if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4103  {
4104  /* Destroy the prepared statement created previously */
4105  if (fmstate->p_name)
4106  deallocate_query(fmstate);
4107 
4108  /* Build INSERT string with numSlots records in its VALUES clause. */
4109  initStringInfo(&sql);
4110  rebuildInsertSql(&sql, fmstate->rel,
4111  fmstate->orig_query, fmstate->target_attrs,
4112  fmstate->values_end, fmstate->p_nums,
4113  *numSlots - 1);
4114  pfree(fmstate->query);
4115  fmstate->query = sql.data;
4116  fmstate->num_slots = *numSlots;
4117  }
4118 
4119  /* Set up the prepared statement on the remote server, if we didn't yet */
4120  if (!fmstate->p_name)
4121  prepare_foreign_modify(fmstate);
4122 
4123  /*
4124  * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4125  */
4126  if (operation == CMD_UPDATE || operation == CMD_DELETE)
4127  {
4128  Datum datum;
4129  bool isNull;
4130 
4131  datum = ExecGetJunkAttribute(planSlots[0],
4132  fmstate->ctidAttno,
4133  &isNull);
4134  /* shouldn't ever get a null result... */
4135  if (isNull)
4136  elog(ERROR, "ctid is NULL");
4137  ctid = (ItemPointer) DatumGetPointer(datum);
4138  }
4139 
4140  /* Convert parameters needed by prepared statement to text form */
4141  p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4142 
4143  /*
4144  * Execute the prepared statement.
4145  */
4146  if (!PQsendQueryPrepared(fmstate->conn,
4147  fmstate->p_name,
4148  fmstate->p_nums * (*numSlots),
4149  p_values,
4150  NULL,
4151  NULL,
4152  0))
4153  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4154 
4155  /*
4156  * Get the result, and check for success.
4157  *
4158  * We don't use a PG_TRY block here, so be careful not to throw error
4159  * without releasing the PGresult.
4160  */
4161  res = pgfdw_get_result(fmstate->conn, fmstate->query);
4162  if (PQresultStatus(res) !=
4164  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4165 
4166  /* Check number of rows affected, and fetch RETURNING tuple if any */
4167  if (fmstate->has_returning)
4168  {
4169  Assert(*numSlots == 1);
4170  n_rows = PQntuples(res);
4171  if (n_rows > 0)
4172  store_returning_result(fmstate, slots[0], res);
4173  }
4174  else
4175  n_rows = atoi(PQcmdTuples(res));
4176 
4177  /* And clean up */
4178  PQclear(res);
4179 
4180  MemoryContextReset(fmstate->temp_cxt);
4181 
4182  *numSlots = n_rows;
4183 
4184  /*
4185  * Return NULL if nothing was inserted/updated/deleted on the remote end
4186  */
4187  return (n_rows > 0) ? slots : NULL;
4188 }
4189 
4190 /*
4191  * prepare_foreign_modify
4192  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4193  */
4194 static void
4196 {
4197  char prep_name[NAMEDATALEN];
4198  char *p_name;
4199  PGresult *res;
4200 
4201  /*
4202  * The caller would already have processed a pending asynchronous request
4203  * if any, so no need to do it here.
4204  */
4205 
4206  /* Construct name we'll use for the prepared statement. */
4207  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4208  GetPrepStmtNumber(fmstate->conn));
4209  p_name = pstrdup(prep_name);
4210 
4211  /*
4212  * We intentionally do not specify parameter types here, but leave the
4213  * remote server to derive them by default. This avoids possible problems
4214  * with the remote server using different type OIDs than we do. All of
4215  * the prepared statements we use in this module are simple enough that
4216  * the remote server will make the right choices.
4217  */
4218  if (!PQsendPrepare(fmstate->conn,
4219  p_name,
4220  fmstate->query,
4221  0,
4222  NULL))
4223  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4224 
4225  /*
4226  * Get the result, and check for success.
4227  *
4228  * We don't use a PG_TRY block here, so be careful not to throw error
4229  * without releasing the PGresult.
4230  */
4231  res = pgfdw_get_result(fmstate->conn, fmstate->query);
4233  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4234  PQclear(res);
4235 
4236  /* This action shows that the prepare has been done. */
4237  fmstate->p_name = p_name;
4238 }
4239 
4240 /*
4241  * convert_prep_stmt_params
4242  * Create array of text strings representing parameter values
4243  *
4244  * tupleid is ctid to send, or NULL if none
4245  * slot is slot to get remaining parameters from, or NULL if none
4246  *
4247  * Data is constructed in temp_cxt; caller should reset that after use.
4248  */
4249 static const char **
4251  ItemPointer tupleid,
4252  TupleTableSlot **slots,
4253  int numSlots)
4254 {
4255  const char **p_values;
4256  int i;
4257  int j;
4258  int pindex = 0;
4259  MemoryContext oldcontext;
4260 
4261  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4262 
4263  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4264 
4265  /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4266  Assert(!(tupleid != NULL && numSlots > 1));
4267 
4268  /* 1st parameter should be ctid, if it's in use */
4269  if (tupleid != NULL)
4270  {
4271  Assert(numSlots == 1);
4272  /* don't need set_transmission_modes for TID output */
4273  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4274  PointerGetDatum(tupleid));
4275  pindex++;
4276  }
4277 
4278  /* get following parameters from slots */
4279  if (slots != NULL && fmstate->target_attrs != NIL)
4280  {
4281  TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4282  int nestlevel;
4283  ListCell *lc;
4284 
4285  nestlevel = set_transmission_modes();
4286 
4287  for (i = 0; i < numSlots; i++)
4288  {
4289  j = (tupleid != NULL) ? 1 : 0;
4290  foreach(lc, fmstate->target_attrs)
4291  {
4292  int attnum = lfirst_int(lc);
4293  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4294  Datum value;
4295  bool isnull;
4296 
4297  /* Ignore generated columns; they are set to DEFAULT */
4298  if (attr->attgenerated)
4299  continue;
4300  value = slot_getattr(slots[i], attnum, &isnull);
4301  if (isnull)
4302  p_values[pindex] = NULL;
4303  else
4304  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4305  value);
4306  pindex++;
4307  j++;
4308  }
4309  }
4310 
4311  reset_transmission_modes(nestlevel);
4312  }
4313 
4314  Assert(pindex == fmstate->p_nums * numSlots);
4315 
4316  MemoryContextSwitchTo(oldcontext);
4317 
4318  return p_values;
4319 }
4320 
4321 /*
4322  * store_returning_result
4323  * Store the result of a RETURNING clause
4324  *
4325  * On error, be sure to release the PGresult on the way out. Callers do not
4326  * have PG_TRY blocks to ensure this happens.
4327  */
4328 static void
4330  TupleTableSlot *slot, PGresult *res)
4331 {
4332  PG_TRY();
4333  {
4334  HeapTuple newtup;
4335 
4336  newtup = make_tuple_from_result_row(res, 0,
4337  fmstate->rel,
4338  fmstate->attinmeta,
4339  fmstate->retrieved_attrs,
4340  NULL,
4341  fmstate->temp_cxt);
4342 
4343  /*
4344  * The returning slot will not necessarily be suitable to store
4345  * heaptuples directly, so allow for conversion.
4346  */
4347  ExecForceStoreHeapTuple(newtup, slot, true);
4348  }
4349  PG_CATCH();
4350  {
4351  PQclear(res);
4352  PG_RE_THROW();
4353  }
4354  PG_END_TRY();
4355 }
4356 
4357 /*
4358  * finish_foreign_modify
4359  * Release resources for a foreign insert/update/delete operation
4360  */
4361 static void
4363 {
4364  Assert(fmstate != NULL);
4365 
4366  /* If we created a prepared statement, destroy it */
4367  deallocate_query(fmstate);
4368 
4369  /* Release remote connection */
4370  ReleaseConnection(fmstate->conn);
4371  fmstate->conn = NULL;
4372 }
4373 
4374 /*
4375  * deallocate_query
4376  * Deallocate a prepared statement for a foreign insert/update/delete
4377  * operation
4378  */
4379 static void
4381 {
4382  char sql[64];
4383  PGresult *res;
4384 
4385  /* do nothing if the query is not allocated */
4386  if (!fmstate->p_name)
4387  return;
4388 
4389  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4390 
4391  /*
4392  * We don't use a PG_TRY block here, so be careful not to throw error
4393  * without releasing the PGresult.
4394  */
4395  res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4397  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4398  PQclear(res);
4399  pfree(fmstate->p_name);
4400  fmstate->p_name = NULL;
4401 }
4402 
4403 /*
4404  * build_remote_returning
4405  * Build a RETURNING targetlist of a remote query for performing an
4406  * UPDATE/DELETE .. RETURNING on a join directly
4407  */
4408 static List *
4409 build_remote_returning(Index rtindex, Relation rel, List *returningList)
4410 {
4411  bool have_wholerow = false;
4412  List *tlist = NIL;
4413  List *vars;
4414  ListCell *lc;
4415 
4416  Assert(returningList);
4417 
4418  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4419 
4420  /*
4421  * If there's a whole-row reference to the target relation, then we'll
4422  * need all the columns of the relation.
4423  */
4424  foreach(lc, vars)
4425  {
4426  Var *var = (Var *) lfirst(lc);
4427 
4428  if (IsA(var, Var) &&
4429  var->varno == rtindex &&
4430  var->varattno == InvalidAttrNumber)
4431  {
4432  have_wholerow = true;
4433  break;
4434  }
4435  }
4436 
4437  if (have_wholerow)
4438  {
4439  TupleDesc tupdesc = RelationGetDescr(rel);
4440  int i;
4441 
4442  for (i = 1; i <= tupdesc->natts; i++)
4443  {
4444  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4445  Var *var;
4446 
4447  /* Ignore dropped attributes. */
4448  if (attr->attisdropped)
4449  continue;
4450 
4451  var = makeVar(rtindex,
4452  i,
4453  attr->atttypid,
4454  attr->atttypmod,
4455  attr->attcollation,
4456  0);
4457 
4458  tlist = lappend(tlist,
4459  makeTargetEntry((Expr *) var,
4460  list_length(tlist) + 1,
4461  NULL,
4462  false));
4463  }
4464  }
4465 
4466  /* Now add any remaining columns to tlist. */
4467  foreach(lc, vars)
4468  {
4469  Var *var = (Var *) lfirst(lc);
4470 
4471  /*
4472  * No need for whole-row references to the target relation. We don't
4473  * need system columns other than ctid and oid either, since those are
4474  * set locally.
4475  */
4476  if (IsA(var, Var) &&
4477  var->varno == rtindex &&
4478  var->varattno <= InvalidAttrNumber &&
4480  continue; /* don't need it */
4481 
4482  if (tlist_member((Expr *) var, tlist))
4483  continue; /* already got it */
4484 
4485  tlist = lappend(tlist,
4486  makeTargetEntry((Expr *) var,
4487  list_length(tlist) + 1,
4488  NULL,
4489  false));
4490  }
4491 
4492  list_free(vars);
4493 
4494  return tlist;
4495 }
4496 
4497 /*
4498  * rebuild_fdw_scan_tlist
4499  * Build new fdw_scan_tlist of given foreign-scan plan node from given
4500  * tlist
4501  *
4502  * There might be columns that the fdw_scan_tlist of the given foreign-scan
4503  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4504  * have contained resjunk columns such as 'ctid' of the target relation and
4505  * 'wholerow' of non-target relations, but the tlist might not contain them,
4506  * for example. So, adjust the tlist so it contains all the columns specified
4507  * in the fdw_scan_tlist; else setrefs.c will get confused.
4508  */
4509 static void
4511 {
4512  List *new_tlist = tlist;
4513  List *old_tlist = fscan->fdw_scan_tlist;
4514  ListCell *lc;
4515 
4516  foreach(lc, old_tlist)
4517  {
4518  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4519 
4520  if (tlist_member(tle->expr, new_tlist))
4521  continue; /* already got it */
4522 
4523  new_tlist = lappend(new_tlist,
4524  makeTargetEntry(tle->expr,
4525  list_length(new_tlist) + 1,
4526  NULL,
4527  false));
4528  }
4529  fscan->fdw_scan_tlist = new_tlist;
4530 }
4531 
4532 /*
4533  * Execute a direct UPDATE/DELETE statement.
4534  */
4535 static void
4537 {
4539  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4540  int numParams = dmstate->numParams;
4541  const char **values = dmstate->param_values;
4542 
4543  /* First, process a pending asynchronous request, if any. */
4544  if (dmstate->conn_state->pendingAreq)
4546 
4547  /*
4548  * Construct array of query parameter values in text format.
4549  */
4550  if (numParams > 0)
4551  process_query_params(econtext,
4552  dmstate->param_flinfo,
4553  dmstate->param_exprs,
4554  values);
4555 
4556  /*
4557  * Notice that we pass NULL for paramTypes, thus forcing the remote server
4558  * to infer types for all parameters. Since we explicitly cast every
4559  * parameter (see deparse.c), the "inference" is trivial and will produce
4560  * the desired result. This allows us to avoid assuming that the remote
4561  * server has the same OIDs we do for the parameters' types.
4562  */
4563  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4564  NULL, values, NULL, NULL, 0))
4565  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4566 
4567  /*
4568  * Get the result, and check for success.
4569  *
4570  * We don't use a PG_TRY block here, so be careful not to throw error
4571  * without releasing the PGresult.
4572  */
4573  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4574  if (PQresultStatus(dmstate->result) !=
4576  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4577  dmstate->query);
4578 
4579  /* Get the number of rows affected. */
4580  if (dmstate->has_returning)
4581  dmstate->num_tuples = PQntuples(dmstate->result);
4582  else
4583  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4584 }
4585 
4586 /*
4587  * Get the result of a RETURNING clause.
4588  */
4589 static TupleTableSlot *
4591 {
4593  EState *estate = node->ss.ps.state;
4594  ResultRelInfo *resultRelInfo = node->resultRelInfo;
4595  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4596  TupleTableSlot *resultSlot;
4597 
4598  Assert(resultRelInfo->ri_projectReturning);
4599 
4600  /* If we didn't get any tuples, must be end of data. */
4601  if (dmstate->next_tuple >= dmstate->num_tuples)
4602  return ExecClearTuple(slot);
4603 
4604  /* Increment the command es_processed count if necessary. */
4605  if (dmstate->set_processed)
4606  estate->es_processed += 1;
4607 
4608  /*
4609  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4610  * tuple. (has_returning is false when the local query is of the form
4611  * "UPDATE/DELETE .. RETURNING 1" for example.)
4612  */
4613  if (!dmstate->has_returning)
4614  {
4615  ExecStoreAllNullTuple(slot);
4616  resultSlot = slot;
4617  }
4618  else
4619  {
4620  /*
4621  * On error, be sure to release the PGresult on the way out. Callers
4622  * do not have PG_TRY blocks to ensure this happens.
4623  */
4624  PG_TRY();
4625  {
4626  HeapTuple newtup;
4627 
4628  newtup = make_tuple_from_result_row(dmstate->result,
4629  dmstate->next_tuple,
4630  dmstate->rel,
4631  dmstate->attinmeta,
4632  dmstate->retrieved_attrs,
4633  node,
4634  dmstate->temp_cxt);
4635  ExecStoreHeapTuple(newtup, slot, false);
4636  }
4637  PG_CATCH();
4638  {
4639  PQclear(dmstate->result);
4640  PG_RE_THROW();
4641  }
4642  PG_END_TRY();
4643 
4644  /* Get the updated/deleted tuple. */
4645  if (dmstate->rel)
4646  resultSlot = slot;
4647  else
4648  resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4649  }
4650  dmstate->next_tuple++;
4651 
4652  /* Make slot available for evaluation of the local query RETURNING list. */
4653  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4654  resultSlot;
4655 
4656  return slot;
4657 }
4658 
4659 /*
4660  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4661  */
4662 static void
4664  List *fdw_scan_tlist,
4665  Index rtindex)
4666 {
4667  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4668  ListCell *lc;
4669  int i;
4670 
4671  /*
4672  * Calculate the mapping between the fdw_scan_tlist's entries and the
4673  * result tuple's attributes.
4674  *
4675  * The "map" is an array of indexes of the result tuple's attributes in
4676  * fdw_scan_tlist, i.e., one entry for every attribute of the result
4677  * tuple. We store zero for any attributes that don't have the
4678  * corresponding entries in that list, marking that a NULL is needed in
4679  * the result tuple.
4680  *
4681  * Also get the indexes of the entries for ctid and oid if any.
4682  */
4683  dmstate->attnoMap = (AttrNumber *)
4684  palloc0(resultTupType->natts * sizeof(AttrNumber));
4685 
4686  dmstate->ctidAttno = dmstate->oidAttno = 0;
4687 
4688  i = 1;
4689  dmstate->hasSystemCols = false;
4690  foreach(lc, fdw_scan_tlist)
4691  {
4692  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4693  Var *var = (Var *) tle->expr;
4694 
4695  Assert(IsA(var, Var));
4696 
4697  /*
4698  * If the Var is a column of the target relation to be retrieved from
4699  * the foreign server, get the index of the entry.
4700  */
4701  if (var->varno == rtindex &&
4702  list_member_int(dmstate->retrieved_attrs, i))
4703  {
4704  int attrno = var->varattno;
4705 
4706  if (attrno < 0)
4707  {
4708  /*
4709  * We don't retrieve system columns other than ctid and oid.
4710  */
4711  if (attrno == SelfItemPointerAttributeNumber)
4712  dmstate->ctidAttno = i;
4713  else
4714  Assert(false);
4715  dmstate->hasSystemCols = true;
4716  }
4717  else
4718  {
4719  /*
4720  * We don't retrieve whole-row references to the target
4721  * relation either.
4722  */
4723  Assert(attrno > 0);
4724 
4725  dmstate->attnoMap[attrno - 1] = i;
4726  }
4727  }
4728  i++;
4729  }
4730 }
4731 
4732 /*
4733  * Extract and return an updated/deleted tuple from a scan tuple.
4734  */
4735 static TupleTableSlot *
4737  ResultRelInfo *resultRelInfo,
4738  TupleTableSlot *slot,
4739  EState *estate)
4740 {
4741  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4742  TupleTableSlot *resultSlot;
4743  Datum *values;
4744  bool *isnull;
4745  Datum *old_values;
4746  bool *old_isnull;
4747  int i;
4748 
4749  /*
4750  * Use the return tuple slot as a place to store the result tuple.
4751  */
4752  resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4753 
4754  /*
4755  * Extract all the values of the scan tuple.
4756  */
4757  slot_getallattrs(slot);
4758  old_values = slot->tts_values;
4759  old_isnull = slot->tts_isnull;
4760 
4761  /*
4762  * Prepare to build the result tuple.
4763  */
4764  ExecClearTuple(resultSlot);
4765  values = resultSlot->tts_values;
4766  isnull = resultSlot->tts_isnull;
4767 
4768  /*
4769  * Transpose data into proper fields of the result tuple.
4770  */
4771  for (i = 0; i < resultTupType->natts; i++)
4772  {
4773  int j = dmstate->attnoMap[i];
4774 
4775  if (j == 0)
4776  {
4777  values[i] = (Datum) 0;
4778  isnull[i] = true;
4779  }
4780  else
4781  {
4782  values[i] = old_values[j - 1];
4783  isnull[i] = old_isnull[j - 1];
4784  }
4785  }
4786 
4787  /*
4788  * Build the virtual tuple.
4789  */
4790  ExecStoreVirtualTuple(resultSlot);
4791 
4792  /*
4793  * If we have any system columns to return, materialize a heap tuple in
4794  * the slot from column values set above and install system columns in
4795  * that tuple.
4796  */
4797  if (dmstate->hasSystemCols)
4798  {
4799  HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4800 
4801  /* ctid */
4802  if (dmstate->ctidAttno)
4803  {
4804  ItemPointer ctid = NULL;
4805 
4806  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4807  resultTup->t_self = *ctid;
4808  }
4809 
4810  /*
4811  * And remaining columns
4812  *
4813  * Note: since we currently don't allow the target relation to appear
4814  * on the nullable side of an outer join, any system columns wouldn't
4815  * go to NULL.
4816  *
4817  * Note: no need to care about tableoid here because it will be
4818  * initialized in ExecProcessReturning().
4819  */
4823  }
4824 
4825  /*
4826  * And return the result tuple.
4827  */
4828  return resultSlot;
4829 }
4830 
4831 /*
4832  * Prepare for processing of parameters used in remote query.
4833  */
4834 static void
4836  List *fdw_exprs,
4837  int numParams,
4838  FmgrInfo **param_flinfo,
4839  List **param_exprs,
4840  const char ***param_values)
4841 {
4842  int i;
4843  ListCell *lc;
4844 
4845  Assert(numParams > 0);
4846 
4847  /* Prepare for output conversion of parameters used in remote query. */
4848  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4849 
4850  i = 0;
4851  foreach(lc, fdw_exprs)
4852  {
4853  Node *param_expr = (Node *) lfirst(lc);
4854  Oid typefnoid;
4855  bool isvarlena;
4856 
4857  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4858  fmgr_info(typefnoid, &(*param_flinfo)[i]);
4859  i++;
4860  }
4861 
4862  /*
4863  * Prepare remote-parameter expressions for evaluation. (Note: in
4864  * practice, we expect that all these expressions will be just Params, so
4865  * we could possibly do something more efficient than using the full
4866  * expression-eval machinery for this. But probably there would be little
4867  * benefit, and it'd require postgres_fdw to know more than is desirable
4868  * about Param evaluation.)
4869  */
4870  *param_exprs = ExecInitExprList(fdw_exprs, node);
4871 
4872  /* Allocate buffer for text form of query parameters. */
4873  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4874 }
4875 
4876 /*
4877  * Construct array of query parameter values in text format.
4878  */
4879 static void
4881  FmgrInfo *param_flinfo,
4882  List *param_exprs,
4883  const char **param_values)
4884 {
4885  int nestlevel;
4886  int i;
4887  ListCell *lc;
4888 
4889  nestlevel = set_transmission_modes();
4890 
4891  i = 0;
4892  foreach(lc, param_exprs)
4893  {
4894  ExprState *expr_state = (ExprState *) lfirst(lc);
4895  Datum expr_value;
4896  bool isNull;
4897 
4898  /* Evaluate the parameter expression */
4899  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4900 
4901  /*
4902  * Get string representation of each parameter value by invoking
4903  * type-specific output function, unless the value is null.
4904  */
4905  if (isNull)
4906  param_values[i] = NULL;
4907  else
4908  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4909 
4910  i++;
4911  }
4912 
4913  reset_transmission_modes(nestlevel);
4914 }
4915 
4916 /*
4917  * postgresAnalyzeForeignTable
4918  * Test whether analyzing this foreign table is supported
4919  */
4920 static bool
4922  AcquireSampleRowsFunc *func,
4923  BlockNumber *totalpages)
4924 {
4925  ForeignTable *table;
4926  UserMapping *user;
4927  PGconn *conn;
4928  StringInfoData sql;
4929  PGresult *volatile res = NULL;
4930 
4931  /* Return the row-analysis function pointer */
4933 
4934  /*
4935  * Now we have to get the number of pages. It's annoying that the ANALYZE
4936  * API requires us to return that now, because it forces some duplication
4937  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4938  * it's probably not worth redefining that API at this point.
4939  */
4940 
4941  /*
4942  * Get the connection to use. We do the remote access as the table's
4943  * owner, even if the ANALYZE was started by some other user.
4944  */
4945  table = GetForeignTable(RelationGetRelid(relation));
4946  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4947  conn = GetConnection(user, false, NULL);
4948 
4949  /*
4950  * Construct command to get page count for relation.
4951  */
4952  initStringInfo(&sql);
4953  deparseAnalyzeSizeSql(&sql, relation);
4954 
4955  /* In what follows, do not risk leaking any PGresults. */
4956  PG_TRY();
4957  {
4958  res = pgfdw_exec_query(conn, sql.data, NULL);
4960  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4961 
4962  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4963  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4964  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4965  }
4966  PG_FINALLY();
4967  {
4968  PQclear(res);
4969  }
4970  PG_END_TRY();
4971 
4973 
4974  return true;
4975 }
4976 
4977 /*
4978  * postgresGetAnalyzeInfoForForeignTable
4979  * Count tuples in foreign table (just get pg_class.reltuples).
4980  *
4981  * can_tablesample determines if the remote relation supports acquiring the
4982  * sample using TABLESAMPLE.
4983  */
4984 static double
4985 postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
4986 {
4987  ForeignTable *table;
4988  UserMapping *user;
4989  PGconn *conn;
4990  StringInfoData sql;
4991  PGresult *volatile res = NULL;
4992  volatile double reltuples = -1;
4993  volatile char relkind = 0;
4994 
4995  /* assume the remote relation does not support TABLESAMPLE */
4996  *can_tablesample = false;
4997 
4998  /*
4999  * Get the connection to use. We do the remote access as the table's
5000  * owner, even if the ANALYZE was started by some other user.
5001  */
5002  table = GetForeignTable(RelationGetRelid(relation));
5003  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5004  conn = GetConnection(user, false, NULL);
5005 
5006  /*
5007  * Construct command to get page count for relation.
5008  */
5009  initStringInfo(&sql);
5010  deparseAnalyzeInfoSql(&sql, relation);
5011 
5012  /* In what follows, do not risk leaking any PGresults. */
5013  PG_TRY();
5014  {
5015  res = pgfdw_exec_query(conn, sql.data, NULL);
5017  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5018 
5019  if (PQntuples(res) != 1 || PQnfields(res) != 2)
5020  elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
5021  reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
5022  relkind = *(PQgetvalue(res, 0, 1));
5023  }
5024  PG_FINALLY();
5025  {
5026  if (res)
5027  PQclear(res);
5028  }
5029  PG_END_TRY();
5030 
5032 
5033  /* TABLESAMPLE is supported only for regular tables and matviews */
5034  *can_tablesample = (relkind == RELKIND_RELATION ||
5035  relkind == RELKIND_MATVIEW ||
5036  relkind == RELKIND_PARTITIONED_TABLE);
5037 
5038  return reltuples;
5039 }
5040 
5041 /*
5042  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
5043  *
5044  * Selected rows are returned in the caller-allocated array rows[],
5045  * which must have at least targrows entries.
5046  * The actual number of rows selected is returned as the function result.
5047  * We also count the total number of rows in the table and return it into
5048  * *totalrows. Note that *totaldeadrows is always set to 0.
5049  *
5050  * Note that the returned list of rows is not always in order by physical
5051  * position in the table. Therefore, correlation estimates derived later
5052  * may be meaningless, but it's OK because we don't use the estimates
5053  * currently (the planner only pays attention to correlation for indexscans).
5054  */
5055 static int
5057  HeapTuple *rows, int targrows,
5058  double *totalrows,
5059  double *totaldeadrows)
5060 {
5061  PgFdwAnalyzeState astate;
5062  ForeignTable *table;
5063  ForeignServer *server;
5064  UserMapping *user;
5065  PGconn *conn;
5066  int server_version_num;
5067  PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
5068  double sample_frac = -1.0;
5069  double reltuples;
5070  unsigned int cursor_number;
5071  StringInfoData sql;
5072  PGresult *volatile res = NULL;
5073  ListCell *lc;
5074 
5075  /* Initialize workspace state */
5076  astate.rel = relation;
5078 
5079  astate.rows = rows;
5080  astate.targrows = targrows;
5081  astate.numrows = 0;
5082  astate.samplerows = 0;
5083  astate.rowstoskip = -1; /* -1 means not set yet */
5084  reservoir_init_selection_state(&astate.rstate, targrows);
5085 
5086  /* Remember ANALYZE context, and create a per-tuple temp context */
5087  astate.anl_cxt = CurrentMemoryContext;
5089  "postgres_fdw temporary data",
5091 
5092  /*
5093  * Get the connection to use. We do the remote access as the table's
5094  * owner, even if the ANALYZE was started by some other user.
5095  */
5096  table = GetForeignTable(RelationGetRelid(relation));
5097  server = GetForeignServer(table->serverid);
5098  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5099  conn = GetConnection(user, false, NULL);
5100 
5101  /* We'll need server version, so fetch it now. */
5103 
5104  /*
5105  * What sampling method should we use?
5106  */
5107  foreach(lc, server->options)
5108  {
5109  DefElem *def = (DefElem *) lfirst(lc);
5110 
5111  if (strcmp(def->defname, "analyze_sampling") == 0)
5112  {
5113  char *value = defGetString(def);
5114 
5115  if (strcmp(value, "off") == 0)
5116  method = ANALYZE_SAMPLE_OFF;
5117  else if (strcmp(value, "auto") == 0)
5118  method = ANALYZE_SAMPLE_AUTO;
5119  else if (strcmp(value, "random") == 0)
5120  method = ANALYZE_SAMPLE_RANDOM;
5121  else if (strcmp(value, "system") == 0)
5122  method = ANALYZE_SAMPLE_SYSTEM;
5123  else if (strcmp(value, "bernoulli") == 0)
5124  method = ANALYZE_SAMPLE_BERNOULLI;
5125 
5126  break;
5127  }
5128  }
5129 
5130  foreach(lc, table->options)
5131  {
5132  DefElem *def = (DefElem *) lfirst(lc);
5133 
5134  if (strcmp(def->defname, "analyze_sampling") == 0)
5135  {
5136  char *value = defGetString(def);
5137 
5138  if (strcmp(value, "off") == 0)
5139  method = ANALYZE_SAMPLE_OFF;
5140  else if (strcmp(value, "auto") == 0)
5141  method = ANALYZE_SAMPLE_AUTO;
5142  else if (strcmp(value, "random") == 0)
5143  method = ANALYZE_SAMPLE_RANDOM;
5144  else if (strcmp(value, "system") == 0)
5145  method = ANALYZE_SAMPLE_SYSTEM;
5146  else if (strcmp(value, "bernoulli") == 0)
5147  method = ANALYZE_SAMPLE_BERNOULLI;
5148 
5149  break;
5150  }
5151  }
5152 
5153  /*
5154  * Error-out if explicitly required one of the TABLESAMPLE methods, but
5155  * the server does not support it.
5156  */
5157  if ((server_version_num < 95000) &&
5158  (method == ANALYZE_SAMPLE_SYSTEM ||
5159  method == ANALYZE_SAMPLE_BERNOULLI))
5160  ereport(ERROR,
5161  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5162  errmsg("remote server does not support TABLESAMPLE feature")));
5163 
5164  /*
5165  * If we've decided to do remote sampling, calculate the sampling rate. We
5166  * need to get the number of tuples from the remote server, but skip that
5167  * network round-trip if not needed.
5168  */
5169  if (method != ANALYZE_SAMPLE_OFF)
5170  {
5171  bool can_tablesample;
5172 
5173  reltuples = postgresGetAnalyzeInfoForForeignTable(relation,
5174  &can_tablesample);
5175 
5176  /*
5177  * Make sure we're not choosing TABLESAMPLE when the remote relation
5178  * does not support that. But only do this for "auto" - if the user
5179  * explicitly requested BERNOULLI/SYSTEM, it's better to fail.
5180  */
5181  if (!can_tablesample && (method == ANALYZE_SAMPLE_AUTO))
5182  method = ANALYZE_SAMPLE_RANDOM;
5183 
5184  /*
5185  * Remote's reltuples could be 0 or -1 if the table has never been
5186  * vacuumed/analyzed. In that case, disable sampling after all.
5187  */
5188  if ((reltuples <= 0) || (targrows >= reltuples))
5189  method = ANALYZE_SAMPLE_OFF;
5190  else
5191  {
5192  /*
5193  * All supported sampling methods require sampling rate, not
5194  * target rows directly, so we calculate that using the remote
5195  * reltuples value. That's imperfect, because it might be off a
5196  * good deal, but that's not something we can (or should) address
5197  * here.
5198  *
5199  * If reltuples is too low (i.e. when table grew), we'll end up
5200  * sampling more rows - but then we'll apply the local sampling,
5201  * so we get the expected sample size. This is the same outcome as
5202  * without remote sampling.
5203  *
5204  * If reltuples is too high (e.g. after bulk DELETE), we will end
5205  * up sampling too few rows.
5206  *
5207  * We can't really do much better here - we could try sampling a
5208  * bit more rows, but we don't know how off the reltuples value is
5209  * so how much is "a bit more"?
5210  *
5211  * Furthermore, the targrows value for partitions is determined
5212  * based on table size (relpages), which can be off in different
5213  * ways too. Adjusting the sampling rate here might make the issue
5214  * worse.
5215  */
5216  sample_frac = targrows / reltuples;
5217 
5218  /*
5219  * We should never get sampling rate outside the valid range
5220  * (between 0.0 and 1.0), because those cases should be covered by
5221  * the previous branch that sets ANALYZE_SAMPLE_OFF.
5222  */
5223  Assert(sample_frac >= 0.0 && sample_frac <= 1.0);
5224  }
5225  }
5226 
5227  /*
5228  * For "auto" method, pick the one we believe is best. For servers with
5229  * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
5230  * random() to at least reduce network transfer.
5231  */
5232  if (method == ANALYZE_SAMPLE_AUTO)
5233  {
5234  if (server_version_num < 95000)
5235  method = ANALYZE_SAMPLE_RANDOM;
5236  else
5237  method = ANALYZE_SAMPLE_BERNOULLI;
5238  }
5239 
5240  /*
5241  * Construct cursor that retrieves whole rows from remote.
5242  */
5244  initStringInfo(&sql);
5245  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5246 
5247  deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
5248 
5249  /* In what follows, do not risk leaking any PGresults. */
5250  PG_TRY();
5251  {
5252  char fetch_sql[64];
5253  int fetch_size;
5254 
5255  res = pgfdw_exec_query(conn, sql.data, NULL);
5257  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5258  PQclear(res);
5259  res = NULL;
5260 
5261  /*
5262  * Determine the fetch size. The default is arbitrary, but shouldn't
5263  * be enormous.
5264  */
5265  fetch_size = 100;
5266  foreach(lc, server->options)
5267  {
5268  DefElem *def = (DefElem *) lfirst(lc);
5269 
5270  if (strcmp(def->defname, "fetch_size") == 0)
5271  {
5272  (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5273  break;
5274  }
5275  }
5276  foreach(lc, table->options)
5277  {
5278  DefElem *def = (DefElem *) lfirst(lc);
5279 
5280  if (strcmp(def->defname, "fetch_size") == 0)
5281  {
5282  (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5283  break;
5284  }
5285  }
5286 
5287  /* Construct command to fetch rows from remote. */
5288  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5290 
5291  /* Retrieve and process rows a batch at a time. */
5292  for (;;)
5293  {
5294  int numrows;
5295  int i;
5296 
5297  /* Allow users to cancel long query */
5299 
5300  /*
5301  * XXX possible future improvement: if rowstoskip is large, we
5302  * could issue a MOVE rather than physically fetching the rows,
5303  * then just adjust rowstoskip and samplerows appropriately.
5304  */
5305 
5306  /* Fetch some rows */
5307  res = pgfdw_exec_query(conn, fetch_sql, NULL);
5308  /* On error, report the original query, not the FETCH. */
5310  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5311 
5312  /* Process whatever we got. */
5313  numrows = PQntuples(res);
5314  for (i = 0; i < numrows; i++)
5315  analyze_row_processor(res, i, &astate);
5316 
5317  PQclear(res);
5318  res = NULL;
5319 
5320  /* Must be EOF if we didn't get all the rows requested. */
5321  if (numrows < fetch_size)
5322  break;
5323  }
5324 
5325  /* Close the cursor, just to be tidy. */
5327  }
5328  PG_CATCH();
5329  {
5330  PQclear(res);
5331  PG_RE_THROW();
5332  }
5333  PG_END_TRY();
5334 
5336 
5337  /* We assume that we have no dead tuple. */
5338  *totaldeadrows = 0.0;
5339 
5340  /*
5341  * Without sampling, we've retrieved all living tuples from foreign
5342  * server, so report that as totalrows. Otherwise use the reltuples
5343  * estimate we got from the remote side.
5344  */
5345  if (method == ANALYZE_SAMPLE_OFF)
5346  *totalrows = astate.samplerows;
5347  else
5348  *totalrows = reltuples;
5349 
5350  /*
5351  * Emit some interesting relation info
5352  */
5353  ereport(elevel,
5354  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5355  RelationGetRelationName(relation),
5356  *totalrows, astate.numrows)));
5357 
5358  return astate.numrows;
5359 }
5360 
5361 /*
5362  * Collect sample rows from the result of query.
5363  * - Use all tuples in sample until target # of samples are collected.
5364  * - Subsequently, replace already-sampled tuples randomly.
5365  */
5366 static void
5368 {
5369  int targrows = astate->targrows;
5370  int pos; /* array index to store tuple in */
5371  MemoryContext oldcontext;
5372 
5373  /* Always increment sample row counter. */
5374  astate->samplerows += 1;
5375 
5376  /*
5377  * Determine the slot where this sample row should be stored. Set pos to
5378  * negative value to indicate the row should be skipped.
5379  */
5380  if (astate->numrows < targrows)
5381  {
5382  /* First targrows rows are always included into the sample */
5383  pos = astate->numrows++;
5384  }
5385  else
5386  {
5387  /*
5388  * Now we start replacing tuples in the sample until we reach the end
5389  * of the relation. Same algorithm as in acquire_sample_rows in
5390  * analyze.c; see Jeff Vitter's paper.
5391  */
5392  if (astate->rowstoskip < 0)
5393  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5394 
5395  if (astate->rowstoskip <= 0)
5396  {
5397  /* Choose a random reservoir element to replace. */
5398  pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate));
5399  Assert(pos >= 0 && pos < targrows);
5400  heap_freetuple(astate->rows[pos]);
5401  }
5402  else
5403  {
5404  /* Skip this tuple. */
5405  pos = -1;
5406  }
5407 
5408  astate->rowstoskip -= 1;
5409  }
5410 
5411  if (pos >= 0)
5412  {
5413  /*
5414  * Create sample tuple from current result row, and store it in the
5415  * position determined above. The tuple has to be created in anl_cxt.
5416  */
5417  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5418 
5419  astate->rows[pos] = make_tuple_from_result_row(res, row,
5420  astate->rel,
5421  astate->attinmeta,
5422  astate->retrieved_attrs,
5423  NULL,
5424  astate->temp_cxt);
5425 
5426  MemoryContextSwitchTo(oldcontext);
5427  }
5428 }
5429 
5430 /*
5431  * Import a foreign schema
5432  */
5433 static List *
5435 {
5436  List *commands = NIL;
5437  bool import_collate = true;
5438  bool import_default = false;
5439  bool import_generated = true;
5440  bool import_not_null = true;
5441  ForeignServer *server;
5442  UserMapping *mapping;
5443  PGconn *conn;
5445  PGresult *volatile res = NULL;
5446  int numrows,
5447  i;
5448  ListCell *lc;
5449 
5450  /* Parse statement options */
5451  foreach(lc, stmt->options)
5452  {
5453  DefElem *def = (DefElem *) lfirst(lc);
5454 
5455  if (strcmp(def->defname, "import_collate") == 0)
5456  import_collate = defGetBoolean(def);
5457  else if (strcmp(def->defname, "import_default") == 0)
5458  import_default = defGetBoolean(def);
5459  else if (strcmp(def->defname, "import_generated") == 0)
5460  import_generated = defGetBoolean(def);
5461  else if (strcmp(def->defname, "import_not_null") == 0)
5462  import_not_null = defGetBoolean(def);
5463  else
5464  ereport(ERROR,
5465  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5466  errmsg("invalid option \"%s\"", def->defname)));
5467  }
5468 
5469  /*
5470  * Get connection to the foreign server. Connection manager will
5471  * establish new connection if necessary.
5472  */
5473  server = GetForeignServer(serverOid);
5474  mapping = GetUserMapping(GetUserId(), server->serverid);
5475  conn = GetConnection(mapping, false, NULL);
5476 
5477  /* Don't attempt to import collation if remote server hasn't got it */
5478  if (PQserverVersion(conn) < 90100)
5479  import_collate = false;
5480 
5481  /* Create workspace for strings */
5482  initStringInfo(&buf);
5483 
5484  /* In what follows, do not risk leaking any PGresults. */
5485  PG_TRY();
5486  {
5487  /* Check that the schema really exists */
5488  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5489  deparseStringLiteral(&buf, stmt->remote_schema);
5490 
5491  res = pgfdw_exec_query(conn, buf.data, NULL);
5493  pgfdw_report_error(ERROR, res, conn, false, buf.data);
5494 
5495  if (PQntuples(res) != 1)
5496  ereport(ERROR,
5497  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5498  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5499  stmt->remote_schema, server->servername)));
5500 
5501  PQclear(res);
5502  res = NULL;
5503  resetStringInfo(&buf);
5504 
5505  /*
5506  * Fetch all table data from this schema, possibly restricted by
5507  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5508  * to EXCEPT/LIMIT TO here, because the core code will filter the
5509  * statements we return according to those lists anyway. But it
5510  * should save a few cycles to not process excluded tables in the
5511  * first place.)
5512  *
5513  * Import table data for partitions only when they are explicitly
5514  * specified in LIMIT TO clause. Otherwise ignore them and only
5515  * include the definitions of the root partitioned tables to allow
5516  * access to the complete remote data set locally in the schema
5517  * imported.
5518  *
5519  * Note: because we run the connection with search_path restricted to
5520  * pg_catalog, the format_type() and pg_get_expr() outputs will always
5521  * include a schema name for types/functions in other schemas, which
5522  * is what we want.
5523  */
5525  "SELECT relname, "
5526  " attname, "
5527  " format_type(atttypid, atttypmod), "
5528  " attnotnull, "
5529  " pg_get_expr(adbin, adrelid), ");
5530 
5531  /* Generated columns are supported since Postgres 12 */
5532  if (PQserverVersion(conn) >= 120000)
5533