PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <limits.h>
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_opfamily.h"
22 #include "commands/defrem.h"
23 #include "commands/explain.h"
24 #include "commands/vacuum.h"
25 #include "executor/execAsync.h"
26 #include "foreign/fdwapi.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "nodes/nodeFuncs.h"
31 #include "optimizer/appendinfo.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/cost.h"
34 #include "optimizer/inherit.h"
35 #include "optimizer/optimizer.h"
36 #include "optimizer/pathnode.h"
37 #include "optimizer/paths.h"
38 #include "optimizer/planmain.h"
39 #include "optimizer/prep.h"
40 #include "optimizer/restrictinfo.h"
41 #include "optimizer/tlist.h"
42 #include "parser/parsetree.h"
43 #include "postgres_fdw.h"
44 #include "storage/latch.h"
45 #include "utils/builtins.h"
46 #include "utils/float.h"
47 #include "utils/guc.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/sampling.h"
52 #include "utils/selfuncs.h"
53 
55 
56 /* Default CPU cost to start up a foreign query. */
57 #define DEFAULT_FDW_STARTUP_COST 100.0
58 
59 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
60 #define DEFAULT_FDW_TUPLE_COST 0.01
61 
62 /* If no remote estimates, assume a sort costs 20% extra */
63 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
64 
65 /*
66  * Indexes of FDW-private information stored in fdw_private lists.
67  *
68  * These items are indexed with the enum FdwScanPrivateIndex, so an item
69  * can be fetched with list_nth(). For example, to get the SELECT statement:
70  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
71  */
73 {
74  /* SQL statement to execute remotely (as a String node) */
76  /* Integer list of attribute numbers retrieved by the SELECT */
78  /* Integer representing the desired fetch_size */
80 
81  /*
82  * String describing join i.e. names of relations being joined and types
83  * of join, added when the scan is join
84  */
86 };
87 
88 /*
89  * Similarly, this enum describes what's kept in the fdw_private list for
90  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
91  *
92  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
93  * 2) Integer list of target attribute numbers for INSERT/UPDATE
94  * (NIL for a DELETE)
95  * 3) Length till the end of VALUES clause for INSERT
96  * (-1 for a DELETE/UPDATE)
97  * 4) Boolean flag showing if the remote query has a RETURNING clause
98  * 5) Integer list of attribute numbers retrieved by RETURNING, if any
99  */
101 {
102  /* SQL statement to execute remotely (as a String node) */
104  /* Integer list of target attribute numbers for INSERT/UPDATE */
106  /* Length till the end of VALUES clause (as an Integer node) */
108  /* has-returning flag (as a Boolean node) */
110  /* Integer list of attribute numbers retrieved by RETURNING */
112 };
113 
114 /*
115  * Similarly, this enum describes what's kept in the fdw_private list for
116  * a ForeignScan node that modifies a foreign table directly. We store:
117  *
118  * 1) UPDATE/DELETE statement text to be sent to the remote server
119  * 2) Boolean flag showing if the remote query has a RETURNING clause
120  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
121  * 4) Boolean flag showing if we set the command es_processed
122  */
124 {
125  /* SQL statement to execute remotely (as a String node) */
127  /* has-returning flag (as a Boolean node) */
129  /* Integer list of attribute numbers retrieved by RETURNING */
131  /* set-processed flag (as a Boolean node) */
133 };
134 
135 /*
136  * Execution state of a foreign scan using postgres_fdw.
137  */
138 typedef struct PgFdwScanState
139 {
140  Relation rel; /* relcache entry for the foreign table. NULL
141  * for a foreign join scan. */
142  TupleDesc tupdesc; /* tuple descriptor of scan */
143  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
144 
145  /* extracted fdw_private data */
146  char *query; /* text of SELECT command */
147  List *retrieved_attrs; /* list of retrieved attribute numbers */
148 
149  /* for remote query execution */
150  PGconn *conn; /* connection for the scan */
151  PgFdwConnState *conn_state; /* extra per-connection state */
152  unsigned int cursor_number; /* quasi-unique ID for my cursor */
153  bool cursor_exists; /* have we created the cursor? */
154  int numParams; /* number of parameters passed to query */
155  FmgrInfo *param_flinfo; /* output conversion functions for them */
156  List *param_exprs; /* executable expressions for param values */
157  const char **param_values; /* textual values of query parameters */
158 
159  /* for storing result tuples */
160  HeapTuple *tuples; /* array of currently-retrieved tuples */
161  int num_tuples; /* # of tuples in array */
162  int next_tuple; /* index of next one to return */
163 
164  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
165  int fetch_ct_2; /* Min(# of fetches done, 2) */
166  bool eof_reached; /* true if last fetch reached EOF */
167 
168  /* for asynchronous execution */
169  bool async_capable; /* engage asynchronous-capable logic? */
170 
171  /* working memory contexts */
172  MemoryContext batch_cxt; /* context holding current batch of tuples */
173  MemoryContext temp_cxt; /* context for per-tuple temporary data */
174 
175  int fetch_size; /* number of tuples per fetch */
177 
178 /*
179  * Execution state of a foreign insert/update/delete operation.
180  */
181 typedef struct PgFdwModifyState
182 {
183  Relation rel; /* relcache entry for the foreign table */
184  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
185 
186  /* for remote query execution */
187  PGconn *conn; /* connection for the scan */
188  PgFdwConnState *conn_state; /* extra per-connection state */
189  char *p_name; /* name of prepared statement, if created */
190 
191  /* extracted fdw_private data */
192  char *query; /* text of INSERT/UPDATE/DELETE command */
193  char *orig_query; /* original text of INSERT command */
194  List *target_attrs; /* list of target attribute numbers */
195  int values_end; /* length up to the end of VALUES */
196  int batch_size; /* value of FDW option "batch_size" */
197  bool has_returning; /* is there a RETURNING clause? */
198  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
199 
200  /* info about parameters for prepared statement */
201  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
202  int p_nums; /* number of parameters to transmit */
203  FmgrInfo *p_flinfo; /* output conversion functions for them */
204 
205  /* batch operation stuff */
206  int num_slots; /* number of slots to insert */
207 
208  /* working memory context */
209  MemoryContext temp_cxt; /* context for per-tuple temporary data */
210 
211  /* for update row movement if subplan result rel */
212  struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
213  * created */
215 
216 /*
217  * Execution state of a foreign scan that modifies a foreign table directly.
218  */
220 {
221  Relation rel; /* relcache entry for the foreign table */
222  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
223 
224  /* extracted fdw_private data */
225  char *query; /* text of UPDATE/DELETE command */
226  bool has_returning; /* is there a RETURNING clause? */
227  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
228  bool set_processed; /* do we set the command es_processed? */
229 
230  /* for remote query execution */
231  PGconn *conn; /* connection for the update */
232  PgFdwConnState *conn_state; /* extra per-connection state */
233  int numParams; /* number of parameters passed to query */
234  FmgrInfo *param_flinfo; /* output conversion functions for them */
235  List *param_exprs; /* executable expressions for param values */
236  const char **param_values; /* textual values of query parameters */
237 
238  /* for storing result tuples */
239  PGresult *result; /* result for query */
240  int num_tuples; /* # of result tuples */
241  int next_tuple; /* index of next one to return */
242  Relation resultRel; /* relcache entry for the target relation */
243  AttrNumber *attnoMap; /* array of attnums of input user columns */
244  AttrNumber ctidAttno; /* attnum of input ctid column */
245  AttrNumber oidAttno; /* attnum of input oid column */
246  bool hasSystemCols; /* are there system columns of resultRel? */
247 
248  /* working memory context */
249  MemoryContext temp_cxt; /* context for per-tuple temporary data */
251 
252 /*
253  * Workspace for analyzing a foreign table.
254  */
255 typedef struct PgFdwAnalyzeState
256 {
257  Relation rel; /* relcache entry for the foreign table */
258  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
259  List *retrieved_attrs; /* attr numbers retrieved by query */
260 
261  /* collected sample rows */
262  HeapTuple *rows; /* array of size targrows */
263  int targrows; /* target # of sample rows */
264  int numrows; /* # of sample rows collected */
265 
266  /* for random sampling */
267  double samplerows; /* # of rows fetched */
268  double rowstoskip; /* # of rows to skip before next sample */
269  ReservoirStateData rstate; /* state for reservoir sampling */
270 
271  /* working memory contexts */
272  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
273  MemoryContext temp_cxt; /* context for per-tuple temporary data */
275 
276 /*
277  * This enum describes what's kept in the fdw_private list for a ForeignPath.
278  * We store:
279  *
280  * 1) Boolean flag showing if the remote query has the final sort
281  * 2) Boolean flag showing if the remote query has the LIMIT clause
282  */
284 {
285  /* has-final-sort flag (as a Boolean node) */
287  /* has-limit flag (as a Boolean node) */
289 };
290 
291 /* Struct for extra information passed to estimate_path_cost_size() */
292 typedef struct
293 {
296  bool has_limit;
297  double limit_tuples;
298  int64 count_est;
299  int64 offset_est;
301 
302 /*
303  * Identify the attribute where data conversion fails.
304  */
305 typedef struct ConversionLocation
306 {
307  AttrNumber cur_attno; /* attribute number being processed, or 0 */
308  Relation rel; /* foreign table being processed, or NULL */
309  ForeignScanState *fsstate; /* plan node being processed, or NULL */
311 
312 /* Callback argument for ec_member_matches_foreign */
313 typedef struct
314 {
315  Expr *current; /* current expr, or NULL if not yet found */
316  List *already_used; /* expressions already dealt with */
318 
319 /*
320  * SQL functions
321  */
323 
324 /*
325  * FDW callback routines
326  */
327 static void postgresGetForeignRelSize(PlannerInfo *root,
328  RelOptInfo *baserel,
329  Oid foreigntableid);
330 static void postgresGetForeignPaths(PlannerInfo *root,
331  RelOptInfo *baserel,
332  Oid foreigntableid);
334  RelOptInfo *foreignrel,
335  Oid foreigntableid,
336  ForeignPath *best_path,
337  List *tlist,
338  List *scan_clauses,
339  Plan *outer_plan);
340 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
343 static void postgresEndForeignScan(ForeignScanState *node);
345  Index rtindex,
346  RangeTblEntry *target_rte,
347  Relation target_relation);
349  ModifyTable *plan,
350  Index resultRelation,
351  int subplan_index);
352 static void postgresBeginForeignModify(ModifyTableState *mtstate,
353  ResultRelInfo *resultRelInfo,
354  List *fdw_private,
355  int subplan_index,
356  int eflags);
358  ResultRelInfo *resultRelInfo,
359  TupleTableSlot *slot,
360  TupleTableSlot *planSlot);
362  ResultRelInfo *resultRelInfo,
363  TupleTableSlot **slots,
364  TupleTableSlot **planSlots,
365  int *numSlots);
366 static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
368  ResultRelInfo *resultRelInfo,
369  TupleTableSlot *slot,
370  TupleTableSlot *planSlot);
372  ResultRelInfo *resultRelInfo,
373  TupleTableSlot *slot,
374  TupleTableSlot *planSlot);
375 static void postgresEndForeignModify(EState *estate,
376  ResultRelInfo *resultRelInfo);
377 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
378  ResultRelInfo *resultRelInfo);
379 static void postgresEndForeignInsert(EState *estate,
380  ResultRelInfo *resultRelInfo);
382 static bool postgresPlanDirectModify(PlannerInfo *root,
383  ModifyTable *plan,
384  Index resultRelation,
385  int subplan_index);
386 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
388 static void postgresEndDirectModify(ForeignScanState *node);
390  ExplainState *es);
392  ResultRelInfo *rinfo,
393  List *fdw_private,
394  int subplan_index,
395  ExplainState *es);
397  ExplainState *es);
398 static void postgresExecForeignTruncate(List *rels,
399  DropBehavior behavior,
400  bool restart_seqs);
401 static bool postgresAnalyzeForeignTable(Relation relation,
402  AcquireSampleRowsFunc *func,
403  BlockNumber *totalpages);
405  Oid serverOid);
406 static void postgresGetForeignJoinPaths(PlannerInfo *root,
407  RelOptInfo *joinrel,
408  RelOptInfo *outerrel,
409  RelOptInfo *innerrel,
410  JoinType jointype,
411  JoinPathExtraData *extra);
413  TupleTableSlot *slot);
414 static void postgresGetForeignUpperPaths(PlannerInfo *root,
415  UpperRelationKind stage,
416  RelOptInfo *input_rel,
417  RelOptInfo *output_rel,
418  void *extra);
420 static void postgresForeignAsyncRequest(AsyncRequest *areq);
422 static void postgresForeignAsyncNotify(AsyncRequest *areq);
423 
424 /*
425  * Helper functions
426  */
427 static void estimate_path_cost_size(PlannerInfo *root,
428  RelOptInfo *foreignrel,
429  List *param_join_conds,
430  List *pathkeys,
431  PgFdwPathExtraData *fpextra,
432  double *p_rows, int *p_width,
433  Cost *p_startup_cost, Cost *p_total_cost);
434 static void get_remote_estimate(const char *sql,
435  PGconn *conn,
436  double *rows,
437  int *width,
438  Cost *startup_cost,
439  Cost *total_cost);
441  List *pathkeys,
442  double retrieved_rows,
443  double width,
444  double limit_tuples,
445  Cost *p_startup_cost,
446  Cost *p_run_cost);
447 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
449  void *arg);
450 static void create_cursor(ForeignScanState *node);
451 static void fetch_more_data(ForeignScanState *node);
452 static void close_cursor(PGconn *conn, unsigned int cursor_number,
453  PgFdwConnState *conn_state);
455  RangeTblEntry *rte,
456  ResultRelInfo *resultRelInfo,
457  CmdType operation,
458  Plan *subplan,
459  char *query,
460  List *target_attrs,
461  int values_end,
462  bool has_returning,
463  List *retrieved_attrs);
465  ResultRelInfo *resultRelInfo,
466  CmdType operation,
467  TupleTableSlot **slots,
468  TupleTableSlot **planSlots,
469  int *numSlots);
470 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
471 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
472  ItemPointer tupleid,
473  TupleTableSlot **slots,
474  int numSlots);
475 static void store_returning_result(PgFdwModifyState *fmstate,
476  TupleTableSlot *slot, PGresult *res);
477 static void finish_foreign_modify(PgFdwModifyState *fmstate);
478 static void deallocate_query(PgFdwModifyState *fmstate);
479 static List *build_remote_returning(Index rtindex, Relation rel,
480  List *returningList);
481 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
482 static void execute_dml_stmt(ForeignScanState *node);
484 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
485  List *fdw_scan_tlist,
486  Index rtindex);
488  ResultRelInfo *resultRelInfo,
489  TupleTableSlot *slot,
490  EState *estate);
491 static void prepare_query_params(PlanState *node,
492  List *fdw_exprs,
493  int numParams,
494  FmgrInfo **param_flinfo,
495  List **param_exprs,
496  const char ***param_values);
497 static void process_query_params(ExprContext *econtext,
498  FmgrInfo *param_flinfo,
499  List *param_exprs,
500  const char **param_values);
501 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
502  HeapTuple *rows, int targrows,
503  double *totalrows,
504  double *totaldeadrows);
505 static void analyze_row_processor(PGresult *res, int row,
506  PgFdwAnalyzeState *astate);
507 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
508 static void fetch_more_data_begin(AsyncRequest *areq);
509 static void complete_pending_request(AsyncRequest *areq);
511  int row,
512  Relation rel,
513  AttInMetadata *attinmeta,
514  List *retrieved_attrs,
515  ForeignScanState *fsstate,
516  MemoryContext temp_context);
517 static void conversion_error_callback(void *arg);
518 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
519  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
520  JoinPathExtraData *extra);
521 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
522  Node *havingQual);
524  RelOptInfo *rel);
527  Path *epq_path);
528 static void add_foreign_grouping_paths(PlannerInfo *root,
529  RelOptInfo *input_rel,
530  RelOptInfo *grouped_rel,
531  GroupPathExtraData *extra);
532 static void add_foreign_ordered_paths(PlannerInfo *root,
533  RelOptInfo *input_rel,
534  RelOptInfo *ordered_rel);
535 static void add_foreign_final_paths(PlannerInfo *root,
536  RelOptInfo *input_rel,
537  RelOptInfo *final_rel,
538  FinalPathExtraData *extra);
539 static void apply_server_options(PgFdwRelationInfo *fpinfo);
540 static void apply_table_options(PgFdwRelationInfo *fpinfo);
541 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
542  const PgFdwRelationInfo *fpinfo_o,
543  const PgFdwRelationInfo *fpinfo_i);
544 static int get_batch_size_option(Relation rel);
545 
546 
547 /*
548  * Foreign-data wrapper handler function: return a struct with pointers
549  * to my callback routines.
550  */
551 Datum
553 {
554  FdwRoutine *routine = makeNode(FdwRoutine);
555 
556  /* Functions for scanning foreign tables */
564 
565  /* Functions for updating foreign tables */
582 
583  /* Function for EvalPlanQual rechecks */
585  /* Support functions for EXPLAIN */
589 
590  /* Support function for TRUNCATE */
592 
593  /* Support functions for ANALYZE */
595 
596  /* Support functions for IMPORT FOREIGN SCHEMA */
598 
599  /* Support functions for join push-down */
601 
602  /* Support functions for upper relation push-down */
604 
605  /* Support functions for asynchronous execution */
610 
611  PG_RETURN_POINTER(routine);
612 }
613 
614 /*
615  * postgresGetForeignRelSize
616  * Estimate # of rows and width of the result of the scan
617  *
618  * We should consider the effect of all baserestrictinfo clauses here, but
619  * not any join clauses.
620  */
621 static void
623  RelOptInfo *baserel,
624  Oid foreigntableid)
625 {
626  PgFdwRelationInfo *fpinfo;
627  ListCell *lc;
628 
629  /*
630  * We use PgFdwRelationInfo to pass various information to subsequent
631  * functions.
632  */
633  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
634  baserel->fdw_private = (void *) fpinfo;
635 
636  /* Base foreign tables need to be pushed down always. */
637  fpinfo->pushdown_safe = true;
638 
639  /* Look up foreign-table catalog info. */
640  fpinfo->table = GetForeignTable(foreigntableid);
641  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
642 
643  /*
644  * Extract user-settable option values. Note that per-table settings of
645  * use_remote_estimate, fetch_size and async_capable override per-server
646  * settings of them, respectively.
647  */
648  fpinfo->use_remote_estimate = false;
651  fpinfo->shippable_extensions = NIL;
652  fpinfo->fetch_size = 100;
653  fpinfo->async_capable = false;
654 
655  apply_server_options(fpinfo);
656  apply_table_options(fpinfo);
657 
658  /*
659  * If the table or the server is configured to use remote estimates,
660  * identify which user to do remote access as during planning. This
661  * should match what ExecCheckPermissions() does. If we fail due to lack
662  * of permissions, the query would have failed at runtime anyway.
663  */
664  if (fpinfo->use_remote_estimate)
665  {
666  Oid userid;
667 
668  userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
669  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
670  }
671  else
672  fpinfo->user = NULL;
673 
674  /*
675  * Identify which baserestrictinfo clauses can be sent to the remote
676  * server and which can't.
677  */
678  classifyConditions(root, baserel, baserel->baserestrictinfo,
679  &fpinfo->remote_conds, &fpinfo->local_conds);
680 
681  /*
682  * Identify which attributes will need to be retrieved from the remote
683  * server. These include all attrs needed for joins or final output, plus
684  * all attrs used in the local_conds. (Note: if we end up using a
685  * parameterized scan, it's possible that some of the join clauses will be
686  * sent to the remote and thus we wouldn't really need to retrieve the
687  * columns used in them. Doesn't seem worth detecting that case though.)
688  */
689  fpinfo->attrs_used = NULL;
690  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
691  &fpinfo->attrs_used);
692  foreach(lc, fpinfo->local_conds)
693  {
694  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
695 
696  pull_varattnos((Node *) rinfo->clause, baserel->relid,
697  &fpinfo->attrs_used);
698  }
699 
700  /*
701  * Compute the selectivity and cost of the local_conds, so we don't have
702  * to do it over again for each path. The best we can do for these
703  * conditions is to estimate selectivity on the basis of local statistics.
704  */
706  fpinfo->local_conds,
707  baserel->relid,
708  JOIN_INNER,
709  NULL);
710 
711  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
712 
713  /*
714  * Set # of retrieved rows and cached relation costs to some negative
715  * value, so that we can detect when they are set to some sensible values,
716  * during one (usually the first) of the calls to estimate_path_cost_size.
717  */
718  fpinfo->retrieved_rows = -1;
719  fpinfo->rel_startup_cost = -1;
720  fpinfo->rel_total_cost = -1;
721 
722  /*
723  * If the table or the server is configured to use remote estimates,
724  * connect to the foreign server and execute EXPLAIN to estimate the
725  * number of rows selected by the restriction clauses, as well as the
726  * average row width. Otherwise, estimate using whatever statistics we
727  * have locally, in a way similar to ordinary tables.
728  */
729  if (fpinfo->use_remote_estimate)
730  {
731  /*
732  * Get cost/size estimates with help of remote server. Save the
733  * values in fpinfo so we don't need to do it again to generate the
734  * basic foreign path.
735  */
736  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
737  &fpinfo->rows, &fpinfo->width,
738  &fpinfo->startup_cost, &fpinfo->total_cost);
739 
740  /* Report estimated baserel size to planner. */
741  baserel->rows = fpinfo->rows;
742  baserel->reltarget->width = fpinfo->width;
743  }
744  else
745  {
746  /*
747  * If the foreign table has never been ANALYZEd, it will have
748  * reltuples < 0, meaning "unknown". We can't do much if we're not
749  * allowed to consult the remote server, but we can use a hack similar
750  * to plancat.c's treatment of empty relations: use a minimum size
751  * estimate of 10 pages, and divide by the column-datatype-based width
752  * estimate to get the corresponding number of tuples.
753  */
754  if (baserel->tuples < 0)
755  {
756  baserel->pages = 10;
757  baserel->tuples =
758  (10 * BLCKSZ) / (baserel->reltarget->width +
760  }
761 
762  /* Estimate baserel size as best we can with local statistics. */
763  set_baserel_size_estimates(root, baserel);
764 
765  /* Fill in basically-bogus cost estimates for use later. */
766  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
767  &fpinfo->rows, &fpinfo->width,
768  &fpinfo->startup_cost, &fpinfo->total_cost);
769  }
770 
771  /*
772  * fpinfo->relation_name gets the numeric rangetable index of the foreign
773  * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
774  * human-readable string at that time.)
775  */
776  fpinfo->relation_name = psprintf("%u", baserel->relid);
777 
778  /* No outer and inner relations. */
779  fpinfo->make_outerrel_subquery = false;
780  fpinfo->make_innerrel_subquery = false;
781  fpinfo->lower_subquery_rels = NULL;
782  /* Set the relation index. */
783  fpinfo->relation_index = baserel->relid;
784 }
785 
786 /*
787  * get_useful_ecs_for_relation
788  * Determine which EquivalenceClasses might be involved in useful
789  * orderings of this relation.
790  *
791  * This function is in some respects a mirror image of the core function
792  * pathkeys_useful_for_merging: for a regular table, we know what indexes
793  * we have and want to test whether any of them are useful. For a foreign
794  * table, we don't know what indexes are present on the remote side but
795  * want to speculate about which ones we'd like to use if they existed.
796  *
797  * This function returns a list of potentially-useful equivalence classes,
798  * but it does not guarantee that an EquivalenceMember exists which contains
799  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
800  * ft1.x + t1.x = 0, this function will say that the equivalence class
801  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
802  * t1 is local (or on a different server), it will turn out that no useful
803  * ORDER BY clause can be generated. It's not our job to figure that out
804  * here; we're only interested in identifying relevant ECs.
805  */
806 static List *
808 {
809  List *useful_eclass_list = NIL;
810  ListCell *lc;
811  Relids relids;
812 
813  /*
814  * First, consider whether any active EC is potentially useful for a merge
815  * join against this relation.
816  */
817  if (rel->has_eclass_joins)
818  {
819  foreach(lc, root->eq_classes)
820  {
821  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
822 
823  if (eclass_useful_for_merging(root, cur_ec, rel))
824  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
825  }
826  }
827 
828  /*
829  * Next, consider whether there are any non-EC derivable join clauses that
830  * are merge-joinable. If the joininfo list is empty, we can exit
831  * quickly.
832  */
833  if (rel->joininfo == NIL)
834  return useful_eclass_list;
835 
836  /* If this is a child rel, we must use the topmost parent rel to search. */
837  if (IS_OTHER_REL(rel))
838  {
840  relids = rel->top_parent_relids;
841  }
842  else
843  relids = rel->relids;
844 
845  /* Check each join clause in turn. */
846  foreach(lc, rel->joininfo)
847  {
848  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
849 
850  /* Consider only mergejoinable clauses */
851  if (restrictinfo->mergeopfamilies == NIL)
852  continue;
853 
854  /* Make sure we've got canonical ECs. */
855  update_mergeclause_eclasses(root, restrictinfo);
856 
857  /*
858  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
859  * that left_ec and right_ec will be initialized, per comments in
860  * distribute_qual_to_rels.
861  *
862  * We want to identify which side of this merge-joinable clause
863  * contains columns from the relation produced by this RelOptInfo. We
864  * test for overlap, not containment, because there could be extra
865  * relations on either side. For example, suppose we've got something
866  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
867  * A.y = D.y. The input rel might be the joinrel between A and B, and
868  * we'll consider the join clause A.y = D.y. relids contains a
869  * relation not involved in the join class (B) and the equivalence
870  * class for the left-hand side of the clause contains a relation not
871  * involved in the input rel (C). Despite the fact that we have only
872  * overlap and not containment in either direction, A.y is potentially
873  * useful as a sort column.
874  *
875  * Note that it's even possible that relids overlaps neither side of
876  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
877  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
878  * but overlaps neither side of B. In that case, we just skip this
879  * join clause, since it doesn't suggest a useful sort order for this
880  * relation.
881  */
882  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
883  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
884  restrictinfo->right_ec);
885  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
886  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
887  restrictinfo->left_ec);
888  }
889 
890  return useful_eclass_list;
891 }
892 
893 /*
894  * get_useful_pathkeys_for_relation
895  * Determine which orderings of a relation might be useful.
896  *
897  * Getting data in sorted order can be useful either because the requested
898  * order matches the final output ordering for the overall query we're
899  * planning, or because it enables an efficient merge join. Here, we try
900  * to figure out which pathkeys to consider.
901  */
902 static List *
904 {
905  List *useful_pathkeys_list = NIL;
906  List *useful_eclass_list;
907  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
908  EquivalenceClass *query_ec = NULL;
909  ListCell *lc;
910 
911  /*
912  * Pushing the query_pathkeys to the remote server is always worth
913  * considering, because it might let us avoid a local sort.
914  */
915  fpinfo->qp_is_pushdown_safe = false;
916  if (root->query_pathkeys)
917  {
918  bool query_pathkeys_ok = true;
919 
920  foreach(lc, root->query_pathkeys)
921  {
922  PathKey *pathkey = (PathKey *) lfirst(lc);
923 
924  /*
925  * The planner and executor don't have any clever strategy for
926  * taking data sorted by a prefix of the query's pathkeys and
927  * getting it to be sorted by all of those pathkeys. We'll just
928  * end up resorting the entire data set. So, unless we can push
929  * down all of the query pathkeys, forget it.
930  */
931  if (!is_foreign_pathkey(root, rel, pathkey))
932  {
933  query_pathkeys_ok = false;
934  break;
935  }
936  }
937 
938  if (query_pathkeys_ok)
939  {
940  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
941  fpinfo->qp_is_pushdown_safe = true;
942  }
943  }
944 
945  /*
946  * Even if we're not using remote estimates, having the remote side do the
947  * sort generally won't be any worse than doing it locally, and it might
948  * be much better if the remote side can generate data in the right order
949  * without needing a sort at all. However, what we're going to do next is
950  * try to generate pathkeys that seem promising for possible merge joins,
951  * and that's more speculative. A wrong choice might hurt quite a bit, so
952  * bail out if we can't use remote estimates.
953  */
954  if (!fpinfo->use_remote_estimate)
955  return useful_pathkeys_list;
956 
957  /* Get the list of interesting EquivalenceClasses. */
958  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
959 
960  /* Extract unique EC for query, if any, so we don't consider it again. */
961  if (list_length(root->query_pathkeys) == 1)
962  {
963  PathKey *query_pathkey = linitial(root->query_pathkeys);
964 
965  query_ec = query_pathkey->pk_eclass;
966  }
967 
968  /*
969  * As a heuristic, the only pathkeys we consider here are those of length
970  * one. It's surely possible to consider more, but since each one we
971  * choose to consider will generate a round-trip to the remote side, we
972  * need to be a bit cautious here. It would sure be nice to have a local
973  * cache of information about remote index definitions...
974  */
975  foreach(lc, useful_eclass_list)
976  {
977  EquivalenceClass *cur_ec = lfirst(lc);
978  PathKey *pathkey;
979 
980  /* If redundant with what we did above, skip it. */
981  if (cur_ec == query_ec)
982  continue;
983 
984  /* Can't push down the sort if the EC's opfamily is not shippable. */
986  OperatorFamilyRelationId, fpinfo))
987  continue;
988 
989  /* If no pushable expression for this rel, skip it. */
990  if (find_em_for_rel(root, cur_ec, rel) == NULL)
991  continue;
992 
993  /* Looks like we can generate a pathkey, so let's do it. */
994  pathkey = make_canonical_pathkey(root, cur_ec,
995  linitial_oid(cur_ec->ec_opfamilies),
997  false);
998  useful_pathkeys_list = lappend(useful_pathkeys_list,
999  list_make1(pathkey));
1000  }
1001 
1002  return useful_pathkeys_list;
1003 }
1004 
1005 /*
1006  * postgresGetForeignPaths
1007  * Create possible scan paths for a scan on the foreign table
1008  */
1009 static void
1011  RelOptInfo *baserel,
1012  Oid foreigntableid)
1013 {
1014  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1015  ForeignPath *path;
1016  List *ppi_list;
1017  ListCell *lc;
1018 
1019  /*
1020  * Create simplest ForeignScan path node and add it to baserel. This path
1021  * corresponds to SeqScan path of regular tables (though depending on what
1022  * baserestrict conditions we were able to send to remote, there might
1023  * actually be an indexscan happening there). We already did all the work
1024  * to estimate cost and size of this path.
1025  *
1026  * Although this path uses no join clauses, it could still have required
1027  * parameterization due to LATERAL refs in its tlist.
1028  */
1029  path = create_foreignscan_path(root, baserel,
1030  NULL, /* default pathtarget */
1031  fpinfo->rows,
1032  fpinfo->startup_cost,
1033  fpinfo->total_cost,
1034  NIL, /* no pathkeys */
1035  baserel->lateral_relids,
1036  NULL, /* no extra plan */
1037  NIL); /* no fdw_private list */
1038  add_path(baserel, (Path *) path);
1039 
1040  /* Add paths with pathkeys */
1041  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1042 
1043  /*
1044  * If we're not using remote estimates, stop here. We have no way to
1045  * estimate whether any join clauses would be worth sending across, so
1046  * don't bother building parameterized paths.
1047  */
1048  if (!fpinfo->use_remote_estimate)
1049  return;
1050 
1051  /*
1052  * Thumb through all join clauses for the rel to identify which outer
1053  * relations could supply one or more safe-to-send-to-remote join clauses.
1054  * We'll build a parameterized path for each such outer relation.
1055  *
1056  * It's convenient to manage this by representing each candidate outer
1057  * relation by the ParamPathInfo node for it. We can then use the
1058  * ppi_clauses list in the ParamPathInfo node directly as a list of the
1059  * interesting join clauses for that rel. This takes care of the
1060  * possibility that there are multiple safe join clauses for such a rel,
1061  * and also ensures that we account for unsafe join clauses that we'll
1062  * still have to enforce locally (since the parameterized-path machinery
1063  * insists that we handle all movable clauses).
1064  */
1065  ppi_list = NIL;
1066  foreach(lc, baserel->joininfo)
1067  {
1068  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1069  Relids required_outer;
1070  ParamPathInfo *param_info;
1071 
1072  /* Check if clause can be moved to this rel */
1073  if (!join_clause_is_movable_to(rinfo, baserel))
1074  continue;
1075 
1076  /* See if it is safe to send to remote */
1077  if (!is_foreign_expr(root, baserel, rinfo->clause))
1078  continue;
1079 
1080  /* Calculate required outer rels for the resulting path */
1081  required_outer = bms_union(rinfo->clause_relids,
1082  baserel->lateral_relids);
1083  /* We do not want the foreign rel itself listed in required_outer */
1084  required_outer = bms_del_member(required_outer, baserel->relid);
1085 
1086  /*
1087  * required_outer probably can't be empty here, but if it were, we
1088  * couldn't make a parameterized path.
1089  */
1090  if (bms_is_empty(required_outer))
1091  continue;
1092 
1093  /* Get the ParamPathInfo */
1094  param_info = get_baserel_parampathinfo(root, baserel,
1095  required_outer);
1096  Assert(param_info != NULL);
1097 
1098  /*
1099  * Add it to list unless we already have it. Testing pointer equality
1100  * is OK since get_baserel_parampathinfo won't make duplicates.
1101  */
1102  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1103  }
1104 
1105  /*
1106  * The above scan examined only "generic" join clauses, not those that
1107  * were absorbed into EquivalenceClauses. See if we can make anything out
1108  * of EquivalenceClauses.
1109  */
1110  if (baserel->has_eclass_joins)
1111  {
1112  /*
1113  * We repeatedly scan the eclass list looking for column references
1114  * (or expressions) belonging to the foreign rel. Each time we find
1115  * one, we generate a list of equivalence joinclauses for it, and then
1116  * see if any are safe to send to the remote. Repeat till there are
1117  * no more candidate EC members.
1118  */
1120 
1121  arg.already_used = NIL;
1122  for (;;)
1123  {
1124  List *clauses;
1125 
1126  /* Make clauses, skipping any that join to lateral_referencers */
1127  arg.current = NULL;
1129  baserel,
1131  (void *) &arg,
1132  baserel->lateral_referencers);
1133 
1134  /* Done if there are no more expressions in the foreign rel */
1135  if (arg.current == NULL)
1136  {
1137  Assert(clauses == NIL);
1138  break;
1139  }
1140 
1141  /* Scan the extracted join clauses */
1142  foreach(lc, clauses)
1143  {
1144  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1145  Relids required_outer;
1146  ParamPathInfo *param_info;
1147 
1148  /* Check if clause can be moved to this rel */
1149  if (!join_clause_is_movable_to(rinfo, baserel))
1150  continue;
1151 
1152  /* See if it is safe to send to remote */
1153  if (!is_foreign_expr(root, baserel, rinfo->clause))
1154  continue;
1155 
1156  /* Calculate required outer rels for the resulting path */
1157  required_outer = bms_union(rinfo->clause_relids,
1158  baserel->lateral_relids);
1159  required_outer = bms_del_member(required_outer, baserel->relid);
1160  if (bms_is_empty(required_outer))
1161  continue;
1162 
1163  /* Get the ParamPathInfo */
1164  param_info = get_baserel_parampathinfo(root, baserel,
1165  required_outer);
1166  Assert(param_info != NULL);
1167 
1168  /* Add it to list unless we already have it */
1169  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1170  }
1171 
1172  /* Try again, now ignoring the expression we found this time */
1173  arg.already_used = lappend(arg.already_used, arg.current);
1174  }
1175  }
1176 
1177  /*
1178  * Now build a path for each useful outer relation.
1179  */
1180  foreach(lc, ppi_list)
1181  {
1182  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1183  double rows;
1184  int width;
1185  Cost startup_cost;
1186  Cost total_cost;
1187 
1188  /* Get a cost estimate from the remote */
1189  estimate_path_cost_size(root, baserel,
1190  param_info->ppi_clauses, NIL, NULL,
1191  &rows, &width,
1192  &startup_cost, &total_cost);
1193 
1194  /*
1195  * ppi_rows currently won't get looked at by anything, but still we
1196  * may as well ensure that it matches our idea of the rowcount.
1197  */
1198  param_info->ppi_rows = rows;
1199 
1200  /* Make the path */
1201  path = create_foreignscan_path(root, baserel,
1202  NULL, /* default pathtarget */
1203  rows,
1204  startup_cost,
1205  total_cost,
1206  NIL, /* no pathkeys */
1207  param_info->ppi_req_outer,
1208  NULL,
1209  NIL); /* no fdw_private list */
1210  add_path(baserel, (Path *) path);
1211  }
1212 }
1213 
1214 /*
1215  * postgresGetForeignPlan
1216  * Create ForeignScan plan node which implements selected best path
1217  */
1218 static ForeignScan *
1220  RelOptInfo *foreignrel,
1221  Oid foreigntableid,
1222  ForeignPath *best_path,
1223  List *tlist,
1224  List *scan_clauses,
1225  Plan *outer_plan)
1226 {
1227  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1228  Index scan_relid;
1229  List *fdw_private;
1230  List *remote_exprs = NIL;
1231  List *local_exprs = NIL;
1232  List *params_list = NIL;
1233  List *fdw_scan_tlist = NIL;
1234  List *fdw_recheck_quals = NIL;
1235  List *retrieved_attrs;
1236  StringInfoData sql;
1237  bool has_final_sort = false;
1238  bool has_limit = false;
1239  ListCell *lc;
1240 
1241  /*
1242  * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1243  */
1244  if (best_path->fdw_private)
1245  {
1246  has_final_sort = boolVal(list_nth(best_path->fdw_private,
1248  has_limit = boolVal(list_nth(best_path->fdw_private,
1250  }
1251 
1252  if (IS_SIMPLE_REL(foreignrel))
1253  {
1254  /*
1255  * For base relations, set scan_relid as the relid of the relation.
1256  */
1257  scan_relid = foreignrel->relid;
1258 
1259  /*
1260  * In a base-relation scan, we must apply the given scan_clauses.
1261  *
1262  * Separate the scan_clauses into those that can be executed remotely
1263  * and those that can't. baserestrictinfo clauses that were
1264  * previously determined to be safe or unsafe by classifyConditions
1265  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1266  * else in the scan_clauses list will be a join clause, which we have
1267  * to check for remote-safety.
1268  *
1269  * Note: the join clauses we see here should be the exact same ones
1270  * previously examined by postgresGetForeignPaths. Possibly it'd be
1271  * worth passing forward the classification work done then, rather
1272  * than repeating it here.
1273  *
1274  * This code must match "extract_actual_clauses(scan_clauses, false)"
1275  * except for the additional decision about remote versus local
1276  * execution.
1277  */
1278  foreach(lc, scan_clauses)
1279  {
1280  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1281 
1282  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1283  if (rinfo->pseudoconstant)
1284  continue;
1285 
1286  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1287  remote_exprs = lappend(remote_exprs, rinfo->clause);
1288  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1289  local_exprs = lappend(local_exprs, rinfo->clause);
1290  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1291  remote_exprs = lappend(remote_exprs, rinfo->clause);
1292  else
1293  local_exprs = lappend(local_exprs, rinfo->clause);
1294  }
1295 
1296  /*
1297  * For a base-relation scan, we have to support EPQ recheck, which
1298  * should recheck all the remote quals.
1299  */
1300  fdw_recheck_quals = remote_exprs;
1301  }
1302  else
1303  {
1304  /*
1305  * Join relation or upper relation - set scan_relid to 0.
1306  */
1307  scan_relid = 0;
1308 
1309  /*
1310  * For a join rel, baserestrictinfo is NIL and we are not considering
1311  * parameterization right now, so there should be no scan_clauses for
1312  * a joinrel or an upper rel either.
1313  */
1314  Assert(!scan_clauses);
1315 
1316  /*
1317  * Instead we get the conditions to apply from the fdw_private
1318  * structure.
1319  */
1320  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1321  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1322 
1323  /*
1324  * We leave fdw_recheck_quals empty in this case, since we never need
1325  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1326  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1327  * If we're planning an upperrel (ie, remote grouping or aggregation)
1328  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1329  * allowed, and indeed we *can't* put the remote clauses into
1330  * fdw_recheck_quals because the unaggregated Vars won't be available
1331  * locally.
1332  */
1333 
1334  /* Build the list of columns to be fetched from the foreign server. */
1335  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1336 
1337  /*
1338  * Ensure that the outer plan produces a tuple whose descriptor
1339  * matches our scan tuple slot. Also, remove the local conditions
1340  * from outer plan's quals, lest they be evaluated twice, once by the
1341  * local plan and once by the scan.
1342  */
1343  if (outer_plan)
1344  {
1345  /*
1346  * Right now, we only consider grouping and aggregation beyond
1347  * joins. Queries involving aggregates or grouping do not require
1348  * EPQ mechanism, hence should not have an outer plan here.
1349  */
1350  Assert(!IS_UPPER_REL(foreignrel));
1351 
1352  /*
1353  * First, update the plan's qual list if possible. In some cases
1354  * the quals might be enforced below the topmost plan level, in
1355  * which case we'll fail to remove them; it's not worth working
1356  * harder than this.
1357  */
1358  foreach(lc, local_exprs)
1359  {
1360  Node *qual = lfirst(lc);
1361 
1362  outer_plan->qual = list_delete(outer_plan->qual, qual);
1363 
1364  /*
1365  * For an inner join the local conditions of foreign scan plan
1366  * can be part of the joinquals as well. (They might also be
1367  * in the mergequals or hashquals, but we can't touch those
1368  * without breaking the plan.)
1369  */
1370  if (IsA(outer_plan, NestLoop) ||
1371  IsA(outer_plan, MergeJoin) ||
1372  IsA(outer_plan, HashJoin))
1373  {
1374  Join *join_plan = (Join *) outer_plan;
1375 
1376  if (join_plan->jointype == JOIN_INNER)
1377  join_plan->joinqual = list_delete(join_plan->joinqual,
1378  qual);
1379  }
1380  }
1381 
1382  /*
1383  * Now fix the subplan's tlist --- this might result in inserting
1384  * a Result node atop the plan tree.
1385  */
1386  outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1387  best_path->path.parallel_safe);
1388  }
1389  }
1390 
1391  /*
1392  * Build the query string to be sent for execution, and identify
1393  * expressions to be sent as parameters.
1394  */
1395  initStringInfo(&sql);
1396  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1397  remote_exprs, best_path->path.pathkeys,
1398  has_final_sort, has_limit, false,
1399  &retrieved_attrs, &params_list);
1400 
1401  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1402  fpinfo->final_remote_exprs = remote_exprs;
1403 
1404  /*
1405  * Build the fdw_private list that will be available to the executor.
1406  * Items in the list must match order in enum FdwScanPrivateIndex.
1407  */
1408  fdw_private = list_make3(makeString(sql.data),
1409  retrieved_attrs,
1410  makeInteger(fpinfo->fetch_size));
1411  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1412  fdw_private = lappend(fdw_private,
1413  makeString(fpinfo->relation_name));
1414 
1415  /*
1416  * Create the ForeignScan node for the given relation.
1417  *
1418  * Note that the remote parameter expressions are stored in the fdw_exprs
1419  * field of the finished plan node; we can't keep them in private state
1420  * because then they wouldn't be subject to later planner processing.
1421  */
1422  return make_foreignscan(tlist,
1423  local_exprs,
1424  scan_relid,
1425  params_list,
1426  fdw_private,
1427  fdw_scan_tlist,
1428  fdw_recheck_quals,
1429  outer_plan);
1430 }
1431 
1432 /*
1433  * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1434  */
1435 static TupleDesc
1437 {
1438  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1439  EState *estate = node->ss.ps.state;
1440  TupleDesc tupdesc;
1441 
1442  /*
1443  * The core code has already set up a scan tuple slot based on
1444  * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1445  * but there's one case where it isn't. If we have any whole-row row
1446  * identifier Vars, they may have vartype RECORD, and we need to replace
1447  * that with the associated table's actual composite type. This ensures
1448  * that when we read those ROW() expression values from the remote server,
1449  * we can convert them to a composite type the local server knows.
1450  */
1452  for (int i = 0; i < tupdesc->natts; i++)
1453  {
1454  Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1455  Var *var;
1456  RangeTblEntry *rte;
1457  Oid reltype;
1458 
1459  /* Nothing to do if it's not a generic RECORD attribute */
1460  if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1461  continue;
1462 
1463  /*
1464  * If we can't identify the referenced table, do nothing. This'll
1465  * likely lead to failure later, but perhaps we can muddle through.
1466  */
1467  var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1468  i)->expr;
1469  if (!IsA(var, Var) || var->varattno != 0)
1470  continue;
1471  rte = list_nth(estate->es_range_table, var->varno - 1);
1472  if (rte->rtekind != RTE_RELATION)
1473  continue;
1474  reltype = get_rel_type_id(rte->relid);
1475  if (!OidIsValid(reltype))
1476  continue;
1477  att->atttypid = reltype;
1478  /* shouldn't need to change anything else */
1479  }
1480  return tupdesc;
1481 }
1482 
1483 /*
1484  * postgresBeginForeignScan
1485  * Initiate an executor scan of a foreign PostgreSQL table.
1486  */
1487 static void
1489 {
1490  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1491  EState *estate = node->ss.ps.state;
1492  PgFdwScanState *fsstate;
1493  RangeTblEntry *rte;
1494  Oid userid;
1495  ForeignTable *table;
1496  UserMapping *user;
1497  int rtindex;
1498  int numParams;
1499 
1500  /*
1501  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1502  */
1503  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1504  return;
1505 
1506  /*
1507  * We'll save private state in node->fdw_state.
1508  */
1509  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1510  node->fdw_state = (void *) fsstate;
1511 
1512  /*
1513  * Identify which user to do the remote access as. This should match what
1514  * ExecCheckRTEPerms() does.
1515  */
1516  userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1517  if (fsplan->scan.scanrelid > 0)
1518  rtindex = fsplan->scan.scanrelid;
1519  else
1520  rtindex = bms_next_member(fsplan->fs_base_relids, -1);
1521  rte = exec_rt_fetch(rtindex, estate);
1522 
1523  /* Get info about foreign table. */
1524  table = GetForeignTable(rte->relid);
1525  user = GetUserMapping(userid, table->serverid);
1526 
1527  /*
1528  * Get connection to the foreign server. Connection manager will
1529  * establish new connection if necessary.
1530  */
1531  fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1532 
1533  /* Assign a unique ID for my cursor */
1534  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1535  fsstate->cursor_exists = false;
1536 
1537  /* Get private info created by planner functions. */
1538  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1540  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1542  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1544 
1545  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1546  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1547  "postgres_fdw tuple data",
1549  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1550  "postgres_fdw temporary data",
1552 
1553  /*
1554  * Get info we'll need for converting data fetched from the foreign server
1555  * into local representation and error reporting during that process.
1556  */
1557  if (fsplan->scan.scanrelid > 0)
1558  {
1559  fsstate->rel = node->ss.ss_currentRelation;
1560  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1561  }
1562  else
1563  {
1564  fsstate->rel = NULL;
1565  fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1566  }
1567 
1568  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1569 
1570  /*
1571  * Prepare for processing of parameters used in remote query, if any.
1572  */
1573  numParams = list_length(fsplan->fdw_exprs);
1574  fsstate->numParams = numParams;
1575  if (numParams > 0)
1577  fsplan->fdw_exprs,
1578  numParams,
1579  &fsstate->param_flinfo,
1580  &fsstate->param_exprs,
1581  &fsstate->param_values);
1582 
1583  /* Set the async-capable flag */
1584  fsstate->async_capable = node->ss.ps.async_capable;
1585 }
1586 
1587 /*
1588  * postgresIterateForeignScan
1589  * Retrieve next row from the result set, or clear tuple slot to indicate
1590  * EOF.
1591  */
1592 static TupleTableSlot *
1594 {
1595  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1596  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1597 
1598  /*
1599  * In sync mode, if this is the first call after Begin or ReScan, we need
1600  * to create the cursor on the remote side. In async mode, we would have
1601  * already created the cursor before we get here, even if this is the
1602  * first call after Begin or ReScan.
1603  */
1604  if (!fsstate->cursor_exists)
1605  create_cursor(node);
1606 
1607  /*
1608  * Get some more tuples, if we've run out.
1609  */
1610  if (fsstate->next_tuple >= fsstate->num_tuples)
1611  {
1612  /* In async mode, just clear tuple slot. */
1613  if (fsstate->async_capable)
1614  return ExecClearTuple(slot);
1615  /* No point in another fetch if we already detected EOF, though. */
1616  if (!fsstate->eof_reached)
1617  fetch_more_data(node);
1618  /* If we didn't get any tuples, must be end of data. */
1619  if (fsstate->next_tuple >= fsstate->num_tuples)
1620  return ExecClearTuple(slot);
1621  }
1622 
1623  /*
1624  * Return the next tuple.
1625  */
1626  ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1627  slot,
1628  false);
1629 
1630  return slot;
1631 }
1632 
1633 /*
1634  * postgresReScanForeignScan
1635  * Restart the scan.
1636  */
1637 static void
1639 {
1640  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1641  char sql[64];
1642  PGresult *res;
1643 
1644  /* If we haven't created the cursor yet, nothing to do. */
1645  if (!fsstate->cursor_exists)
1646  return;
1647 
1648  /*
1649  * If the node is async-capable, and an asynchronous fetch for it has
1650  * begun, the asynchronous fetch might not have yet completed. Check if
1651  * the node is async-capable, and an asynchronous fetch for it is still in
1652  * progress; if so, complete the asynchronous fetch before restarting the
1653  * scan.
1654  */
1655  if (fsstate->async_capable &&
1656  fsstate->conn_state->pendingAreq &&
1657  fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1658  fetch_more_data(node);
1659 
1660  /*
1661  * If any internal parameters affecting this node have changed, we'd
1662  * better destroy and recreate the cursor. Otherwise, rewinding it should
1663  * be good enough. If we've only fetched zero or one batch, we needn't
1664  * even rewind the cursor, just rescan what we have.
1665  */
1666  if (node->ss.ps.chgParam != NULL)
1667  {
1668  fsstate->cursor_exists = false;
1669  snprintf(sql, sizeof(sql), "CLOSE c%u",
1670  fsstate->cursor_number);
1671  }
1672  else if (fsstate->fetch_ct_2 > 1)
1673  {
1674  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1675  fsstate->cursor_number);
1676  }
1677  else
1678  {
1679  /* Easy: just rescan what we already have in memory, if anything */
1680  fsstate->next_tuple = 0;
1681  return;
1682  }
1683 
1684  /*
1685  * We don't use a PG_TRY block here, so be careful not to throw error
1686  * without releasing the PGresult.
1687  */
1688  res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1690  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1691  PQclear(res);
1692 
1693  /* Now force a fresh FETCH. */
1694  fsstate->tuples = NULL;
1695  fsstate->num_tuples = 0;
1696  fsstate->next_tuple = 0;
1697  fsstate->fetch_ct_2 = 0;
1698  fsstate->eof_reached = false;
1699 }
1700 
1701 /*
1702  * postgresEndForeignScan
1703  * Finish scanning foreign table and dispose objects used for this scan
1704  */
1705 static void
1707 {
1708  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1709 
1710  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1711  if (fsstate == NULL)
1712  return;
1713 
1714  /* Close the cursor if open, to prevent accumulation of cursors */
1715  if (fsstate->cursor_exists)
1716  close_cursor(fsstate->conn, fsstate->cursor_number,
1717  fsstate->conn_state);
1718 
1719  /* Release remote connection */
1720  ReleaseConnection(fsstate->conn);
1721  fsstate->conn = NULL;
1722 
1723  /* MemoryContexts will be deleted automatically. */
1724 }
1725 
1726 /*
1727  * postgresAddForeignUpdateTargets
1728  * Add resjunk column(s) needed for update/delete on a foreign table
1729  */
1730 static void
1732  Index rtindex,
1733  RangeTblEntry *target_rte,
1734  Relation target_relation)
1735 {
1736  Var *var;
1737 
1738  /*
1739  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1740  */
1741 
1742  /* Make a Var representing the desired value */
1743  var = makeVar(rtindex,
1745  TIDOID,
1746  -1,
1747  InvalidOid,
1748  0);
1749 
1750  /* Register it as a row-identity column needed by this target rel */
1751  add_row_identity_var(root, var, rtindex, "ctid");
1752 }
1753 
1754 /*
1755  * postgresPlanForeignModify
1756  * Plan an insert/update/delete operation on a foreign table
1757  */
1758 static List *
1760  ModifyTable *plan,
1761  Index resultRelation,
1762  int subplan_index)
1763 {
1764  CmdType operation = plan->operation;
1765  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1766  Relation rel;
1767  StringInfoData sql;
1768  List *targetAttrs = NIL;
1769  List *withCheckOptionList = NIL;
1770  List *returningList = NIL;
1771  List *retrieved_attrs = NIL;
1772  bool doNothing = false;
1773  int values_end_len = -1;
1774 
1775  initStringInfo(&sql);
1776 
1777  /*
1778  * Core code already has some lock on each rel being planned, so we can
1779  * use NoLock here.
1780  */
1781  rel = table_open(rte->relid, NoLock);
1782 
1783  /*
1784  * In an INSERT, we transmit all columns that are defined in the foreign
1785  * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1786  * foreign table, we transmit all columns like INSERT; else we transmit
1787  * only columns that were explicitly targets of the UPDATE, so as to avoid
1788  * unnecessary data transmission. (We can't do that for INSERT since we
1789  * would miss sending default values for columns not listed in the source
1790  * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1791  * those triggers might change values for non-target columns, in which
1792  * case we would miss sending changed values for those columns.)
1793  */
1794  if (operation == CMD_INSERT ||
1795  (operation == CMD_UPDATE &&
1796  rel->trigdesc &&
1798  {
1799  TupleDesc tupdesc = RelationGetDescr(rel);
1800  int attnum;
1801 
1802  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1803  {
1804  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1805 
1806  if (!attr->attisdropped)
1807  targetAttrs = lappend_int(targetAttrs, attnum);
1808  }
1809  }
1810  else if (operation == CMD_UPDATE)
1811  {
1812  int col;
1813  RelOptInfo *rel = find_base_rel(root, resultRelation);
1814  Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel);
1815 
1816  col = -1;
1817  while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1818  {
1819  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1821 
1822  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1823  elog(ERROR, "system-column update is not supported");
1824  targetAttrs = lappend_int(targetAttrs, attno);
1825  }
1826  }
1827 
1828  /*
1829  * Extract the relevant WITH CHECK OPTION list if any.
1830  */
1831  if (plan->withCheckOptionLists)
1832  withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1833  subplan_index);
1834 
1835  /*
1836  * Extract the relevant RETURNING list if any.
1837  */
1838  if (plan->returningLists)
1839  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1840 
1841  /*
1842  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1843  * should have already been rejected in the optimizer, as presently there
1844  * is no way to recognize an arbiter index on a foreign table. Only DO
1845  * NOTHING is supported without an inference specification.
1846  */
1847  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1848  doNothing = true;
1849  else if (plan->onConflictAction != ONCONFLICT_NONE)
1850  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1851  (int) plan->onConflictAction);
1852 
1853  /*
1854  * Construct the SQL command string.
1855  */
1856  switch (operation)
1857  {
1858  case CMD_INSERT:
1859  deparseInsertSql(&sql, rte, resultRelation, rel,
1860  targetAttrs, doNothing,
1861  withCheckOptionList, returningList,
1862  &retrieved_attrs, &values_end_len);
1863  break;
1864  case CMD_UPDATE:
1865  deparseUpdateSql(&sql, rte, resultRelation, rel,
1866  targetAttrs,
1867  withCheckOptionList, returningList,
1868  &retrieved_attrs);
1869  break;
1870  case CMD_DELETE:
1871  deparseDeleteSql(&sql, rte, resultRelation, rel,
1872  returningList,
1873  &retrieved_attrs);
1874  break;
1875  default:
1876  elog(ERROR, "unexpected operation: %d", (int) operation);
1877  break;
1878  }
1879 
1880  table_close(rel, NoLock);
1881 
1882  /*
1883  * Build the fdw_private list that will be available to the executor.
1884  * Items in the list must match enum FdwModifyPrivateIndex, above.
1885  */
1886  return list_make5(makeString(sql.data),
1887  targetAttrs,
1888  makeInteger(values_end_len),
1889  makeBoolean((retrieved_attrs != NIL)),
1890  retrieved_attrs);
1891 }
1892 
1893 /*
1894  * postgresBeginForeignModify
1895  * Begin an insert/update/delete operation on a foreign table
1896  */
1897 static void
1899  ResultRelInfo *resultRelInfo,
1900  List *fdw_private,
1901  int subplan_index,
1902  int eflags)
1903 {
1904  PgFdwModifyState *fmstate;
1905  char *query;
1906  List *target_attrs;
1907  bool has_returning;
1908  int values_end_len;
1909  List *retrieved_attrs;
1910  RangeTblEntry *rte;
1911 
1912  /*
1913  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1914  * stays NULL.
1915  */
1916  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1917  return;
1918 
1919  /* Deconstruct fdw_private data. */
1920  query = strVal(list_nth(fdw_private,
1922  target_attrs = (List *) list_nth(fdw_private,
1924  values_end_len = intVal(list_nth(fdw_private,
1926  has_returning = boolVal(list_nth(fdw_private,
1928  retrieved_attrs = (List *) list_nth(fdw_private,
1930 
1931  /* Find RTE. */
1932  rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1933  mtstate->ps.state);
1934 
1935  /* Construct an execution state. */
1936  fmstate = create_foreign_modify(mtstate->ps.state,
1937  rte,
1938  resultRelInfo,
1939  mtstate->operation,
1940  outerPlanState(mtstate)->plan,
1941  query,
1942  target_attrs,
1943  values_end_len,
1944  has_returning,
1945  retrieved_attrs);
1946 
1947  resultRelInfo->ri_FdwState = fmstate;
1948 }
1949 
1950 /*
1951  * postgresExecForeignInsert
1952  * Insert one row into a foreign table
1953  */
1954 static TupleTableSlot *
1956  ResultRelInfo *resultRelInfo,
1957  TupleTableSlot *slot,
1958  TupleTableSlot *planSlot)
1959 {
1960  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1961  TupleTableSlot **rslot;
1962  int numSlots = 1;
1963 
1964  /*
1965  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1966  * postgresBeginForeignInsert())
1967  */
1968  if (fmstate->aux_fmstate)
1969  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1970  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1971  &slot, &planSlot, &numSlots);
1972  /* Revert that change */
1973  if (fmstate->aux_fmstate)
1974  resultRelInfo->ri_FdwState = fmstate;
1975 
1976  return rslot ? *rslot : NULL;
1977 }
1978 
1979 /*
1980  * postgresExecForeignBatchInsert
1981  * Insert multiple rows into a foreign table
1982  */
1983 static TupleTableSlot **
1985  ResultRelInfo *resultRelInfo,
1986  TupleTableSlot **slots,
1987  TupleTableSlot **planSlots,
1988  int *numSlots)
1989 {
1990  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1991  TupleTableSlot **rslot;
1992 
1993  /*
1994  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1995  * postgresBeginForeignInsert())
1996  */
1997  if (fmstate->aux_fmstate)
1998  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1999  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
2000  slots, planSlots, numSlots);
2001  /* Revert that change */
2002  if (fmstate->aux_fmstate)
2003  resultRelInfo->ri_FdwState = fmstate;
2004 
2005  return rslot;
2006 }
2007 
2008 /*
2009  * postgresGetForeignModifyBatchSize
2010  * Determine the maximum number of tuples that can be inserted in bulk
2011  *
2012  * Returns the batch size specified for server or table. When batching is not
2013  * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2014  * clause), returns 1.
2015  */
2016 static int
2018 {
2019  int batch_size;
2020  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2021 
2022  /* should be called only once */
2023  Assert(resultRelInfo->ri_BatchSize == 0);
2024 
2025  /*
2026  * Should never get called when the insert is being performed on a table
2027  * that is also among the target relations of an UPDATE operation,
2028  * because postgresBeginForeignInsert() currently rejects such insert
2029  * attempts.
2030  */
2031  Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2032 
2033  /*
2034  * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2035  * the option directly in server/table options. Otherwise just use the
2036  * value we determined earlier.
2037  */
2038  if (fmstate)
2039  batch_size = fmstate->batch_size;
2040  else
2041  batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2042 
2043  /*
2044  * Disable batching when we have to use RETURNING, there are any
2045  * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2046  * WITH CHECK OPTION constraints from parent views.
2047  *
2048  * When there are any BEFORE ROW INSERT triggers on the table, we can't
2049  * support it, because such triggers might query the table we're inserting
2050  * into and act differently if the tuples that have already been processed
2051  * and prepared for insertion are not there.
2052  */
2053  if (resultRelInfo->ri_projectReturning != NULL ||
2054  resultRelInfo->ri_WithCheckOptions != NIL ||
2055  (resultRelInfo->ri_TrigDesc &&
2056  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2057  resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2058  return 1;
2059 
2060  /*
2061  * If the foreign table has no columns, disable batching as the INSERT
2062  * syntax doesn't allow batching multiple empty rows into a zero-column
2063  * table in a single statement. This is needed for COPY FROM, in which
2064  * case fmstate must be non-NULL.
2065  */
2066  if (fmstate && list_length(fmstate->target_attrs) == 0)
2067  return 1;
2068 
2069  /*
2070  * Otherwise use the batch size specified for server/table. The number of
2071  * parameters in a batch is limited to 65535 (uint16), so make sure we
2072  * don't exceed this limit by using the maximum batch_size possible.
2073  */
2074  if (fmstate && fmstate->p_nums > 0)
2075  batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2076 
2077  return batch_size;
2078 }
2079 
2080 /*
2081  * postgresExecForeignUpdate
2082  * Update one row in a foreign table
2083  */
2084 static TupleTableSlot *
2086  ResultRelInfo *resultRelInfo,
2087  TupleTableSlot *slot,
2088  TupleTableSlot *planSlot)
2089 {
2090  TupleTableSlot **rslot;
2091  int numSlots = 1;
2092 
2093  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2094  &slot, &planSlot, &numSlots);
2095 
2096  return rslot ? rslot[0] : NULL;
2097 }
2098 
2099 /*
2100  * postgresExecForeignDelete
2101  * Delete one row from a foreign table
2102  */
2103 static TupleTableSlot *
2105  ResultRelInfo *resultRelInfo,
2106  TupleTableSlot *slot,
2107  TupleTableSlot *planSlot)
2108 {
2109  TupleTableSlot **rslot;
2110  int numSlots = 1;
2111 
2112  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2113  &slot, &planSlot, &numSlots);
2114 
2115  return rslot ? rslot[0] : NULL;
2116 }
2117 
2118 /*
2119  * postgresEndForeignModify
2120  * Finish an insert/update/delete operation on a foreign table
2121  */
2122 static void
2124  ResultRelInfo *resultRelInfo)
2125 {
2126  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2127 
2128  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2129  if (fmstate == NULL)
2130  return;
2131 
2132  /* Destroy the execution state */
2133  finish_foreign_modify(fmstate);
2134 }
2135 
2136 /*
2137  * postgresBeginForeignInsert
2138  * Begin an insert operation on a foreign table
2139  */
2140 static void
2142  ResultRelInfo *resultRelInfo)
2143 {
2144  PgFdwModifyState *fmstate;
2145  ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2146  EState *estate = mtstate->ps.state;
2147  Index resultRelation;
2148  Relation rel = resultRelInfo->ri_RelationDesc;
2149  RangeTblEntry *rte;
2150  TupleDesc tupdesc = RelationGetDescr(rel);
2151  int attnum;
2152  int values_end_len;
2153  StringInfoData sql;
2154  List *targetAttrs = NIL;
2155  List *retrieved_attrs = NIL;
2156  bool doNothing = false;
2157 
2158  /*
2159  * If the foreign table we are about to insert routed rows into is also an
2160  * UPDATE subplan result rel that will be updated later, proceeding with
2161  * the INSERT will result in the later UPDATE incorrectly modifying those
2162  * routed rows, so prevent the INSERT --- it would be nice if we could
2163  * handle this case; but for now, throw an error for safety.
2164  */
2165  if (plan && plan->operation == CMD_UPDATE &&
2166  (resultRelInfo->ri_usesFdwDirectModify ||
2167  resultRelInfo->ri_FdwState))
2168  ereport(ERROR,
2169  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2170  errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2171  RelationGetRelationName(rel))));
2172 
2173  initStringInfo(&sql);
2174 
2175  /* We transmit all columns that are defined in the foreign table. */
2176  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2177  {
2178  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2179 
2180  if (!attr->attisdropped)
2181  targetAttrs = lappend_int(targetAttrs, attnum);
2182  }
2183 
2184  /* Check if we add the ON CONFLICT clause to the remote query. */
2185  if (plan)
2186  {
2187  OnConflictAction onConflictAction = plan->onConflictAction;
2188 
2189  /* We only support DO NOTHING without an inference specification. */
2190  if (onConflictAction == ONCONFLICT_NOTHING)
2191  doNothing = true;
2192  else if (onConflictAction != ONCONFLICT_NONE)
2193  elog(ERROR, "unexpected ON CONFLICT specification: %d",
2194  (int) onConflictAction);
2195  }
2196 
2197  /*
2198  * If the foreign table is a partition that doesn't have a corresponding
2199  * RTE entry, we need to create a new RTE describing the foreign table for
2200  * use by deparseInsertSql and create_foreign_modify() below, after first
2201  * copying the parent's RTE and modifying some fields to describe the
2202  * foreign partition to work on. However, if this is invoked by UPDATE,
2203  * the existing RTE may already correspond to this partition if it is one
2204  * of the UPDATE subplan target rels; in that case, we can just use the
2205  * existing RTE as-is.
2206  */
2207  if (resultRelInfo->ri_RangeTableIndex == 0)
2208  {
2209  ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2210 
2211  rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2212  rte = copyObject(rte);
2213  rte->relid = RelationGetRelid(rel);
2214  rte->relkind = RELKIND_FOREIGN_TABLE;
2215 
2216  /*
2217  * For UPDATE, we must use the RT index of the first subplan target
2218  * rel's RTE, because the core code would have built expressions for
2219  * the partition, such as RETURNING, using that RT index as varno of
2220  * Vars contained in those expressions.
2221  */
2222  if (plan && plan->operation == CMD_UPDATE &&
2223  rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2224  resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2225  else
2226  resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2227  }
2228  else
2229  {
2230  resultRelation = resultRelInfo->ri_RangeTableIndex;
2231  rte = exec_rt_fetch(resultRelation, estate);
2232  }
2233 
2234  /* Construct the SQL command string. */
2235  deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2236  resultRelInfo->ri_WithCheckOptions,
2237  resultRelInfo->ri_returningList,
2238  &retrieved_attrs, &values_end_len);
2239 
2240  /* Construct an execution state. */
2241  fmstate = create_foreign_modify(mtstate->ps.state,
2242  rte,
2243  resultRelInfo,
2244  CMD_INSERT,
2245  NULL,
2246  sql.data,
2247  targetAttrs,
2248  values_end_len,
2249  retrieved_attrs != NIL,
2250  retrieved_attrs);
2251 
2252  /*
2253  * If the given resultRelInfo already has PgFdwModifyState set, it means
2254  * the foreign table is an UPDATE subplan result rel; in which case, store
2255  * the resulting state into the aux_fmstate of the PgFdwModifyState.
2256  */
2257  if (resultRelInfo->ri_FdwState)
2258  {
2259  Assert(plan && plan->operation == CMD_UPDATE);
2260  Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2261  ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2262  }
2263  else
2264  resultRelInfo->ri_FdwState = fmstate;
2265 }
2266 
2267 /*
2268  * postgresEndForeignInsert
2269  * Finish an insert operation on a foreign table
2270  */
2271 static void
2273  ResultRelInfo *resultRelInfo)
2274 {
2275  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2276 
2277  Assert(fmstate != NULL);
2278 
2279  /*
2280  * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2281  * postgresBeginForeignInsert())
2282  */
2283  if (fmstate->aux_fmstate)
2284  fmstate = fmstate->aux_fmstate;
2285 
2286  /* Destroy the execution state */
2287  finish_foreign_modify(fmstate);
2288 }
2289 
2290 /*
2291  * postgresIsForeignRelUpdatable
2292  * Determine whether a foreign table supports INSERT, UPDATE and/or
2293  * DELETE.
2294  */
2295 static int
2297 {
2298  bool updatable;
2299  ForeignTable *table;
2300  ForeignServer *server;
2301  ListCell *lc;
2302 
2303  /*
2304  * By default, all postgres_fdw foreign tables are assumed updatable. This
2305  * can be overridden by a per-server setting, which in turn can be
2306  * overridden by a per-table setting.
2307  */
2308  updatable = true;
2309 
2310  table = GetForeignTable(RelationGetRelid(rel));
2311  server = GetForeignServer(table->serverid);
2312 
2313  foreach(lc, server->options)
2314  {
2315  DefElem *def = (DefElem *) lfirst(lc);
2316 
2317  if (strcmp(def->defname, "updatable") == 0)
2318  updatable = defGetBoolean(def);
2319  }
2320  foreach(lc, table->options)
2321  {
2322  DefElem *def = (DefElem *) lfirst(lc);
2323 
2324  if (strcmp(def->defname, "updatable") == 0)
2325  updatable = defGetBoolean(def);
2326  }
2327 
2328  /*
2329  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2330  */
2331  return updatable ?
2332  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2333 }
2334 
2335 /*
2336  * postgresRecheckForeignScan
2337  * Execute a local join execution plan for a foreign join
2338  */
2339 static bool
2341 {
2342  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2344  TupleTableSlot *result;
2345 
2346  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2347  if (scanrelid > 0)
2348  return true;
2349 
2350  Assert(outerPlan != NULL);
2351 
2352  /* Execute a local join execution plan */
2353  result = ExecProcNode(outerPlan);
2354  if (TupIsNull(result))
2355  return false;
2356 
2357  /* Store result in the given slot */
2358  ExecCopySlot(slot, result);
2359 
2360  return true;
2361 }
2362 
2363 /*
2364  * find_modifytable_subplan
2365  * Helper routine for postgresPlanDirectModify to find the
2366  * ModifyTable subplan node that scans the specified RTI.
2367  *
2368  * Returns NULL if the subplan couldn't be identified. That's not a fatal
2369  * error condition, we just abandon trying to do the update directly.
2370  */
2371 static ForeignScan *
2373  ModifyTable *plan,
2374  Index rtindex,
2375  int subplan_index)
2376 {
2377  Plan *subplan = outerPlan(plan);
2378 
2379  /*
2380  * The cases we support are (1) the desired ForeignScan is the immediate
2381  * child of ModifyTable, or (2) it is the subplan_index'th child of an
2382  * Append node that is the immediate child of ModifyTable. There is no
2383  * point in looking further down, as that would mean that local joins are
2384  * involved, so we can't do the update directly.
2385  *
2386  * There could be a Result atop the Append too, acting to compute the
2387  * UPDATE targetlist values. We ignore that here; the tlist will be
2388  * checked by our caller.
2389  *
2390  * In principle we could examine all the children of the Append, but it's
2391  * currently unlikely that the core planner would generate such a plan
2392  * with the children out-of-order. Moreover, such a search risks costing
2393  * O(N^2) time when there are a lot of children.
2394  */
2395  if (IsA(subplan, Append))
2396  {
2397  Append *appendplan = (Append *) subplan;
2398 
2399  if (subplan_index < list_length(appendplan->appendplans))
2400  subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2401  }
2402  else if (IsA(subplan, Result) &&
2403  outerPlan(subplan) != NULL &&
2404  IsA(outerPlan(subplan), Append))
2405  {
2406  Append *appendplan = (Append *) outerPlan(subplan);
2407 
2408  if (subplan_index < list_length(appendplan->appendplans))
2409  subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2410  }
2411 
2412  /* Now, have we got a ForeignScan on the desired rel? */
2413  if (IsA(subplan, ForeignScan))
2414  {
2415  ForeignScan *fscan = (ForeignScan *) subplan;
2416 
2417  if (bms_is_member(rtindex, fscan->fs_base_relids))
2418  return fscan;
2419  }
2420 
2421  return NULL;
2422 }
2423 
2424 /*
2425  * postgresPlanDirectModify
2426  * Consider a direct foreign table modification
2427  *
2428  * Decide whether it is safe to modify a foreign table directly, and if so,
2429  * rewrite subplan accordingly.
2430  */
2431 static bool
2433  ModifyTable *plan,
2434  Index resultRelation,
2435  int subplan_index)
2436 {
2437  CmdType operation = plan->operation;
2438  RelOptInfo *foreignrel;
2439  RangeTblEntry *rte;
2440  PgFdwRelationInfo *fpinfo;
2441  Relation rel;
2442  StringInfoData sql;
2443  ForeignScan *fscan;
2444  List *processed_tlist = NIL;
2445  List *targetAttrs = NIL;
2446  List *remote_exprs;
2447  List *params_list = NIL;
2448  List *returningList = NIL;
2449  List *retrieved_attrs = NIL;
2450 
2451  /*
2452  * Decide whether it is safe to modify a foreign table directly.
2453  */
2454 
2455  /*
2456  * The table modification must be an UPDATE or DELETE.
2457  */
2458  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2459  return false;
2460 
2461  /*
2462  * Try to locate the ForeignScan subplan that's scanning resultRelation.
2463  */
2464  fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2465  if (!fscan)
2466  return false;
2467 
2468  /*
2469  * It's unsafe to modify a foreign table directly if there are any quals
2470  * that should be evaluated locally.
2471  */
2472  if (fscan->scan.plan.qual != NIL)
2473  return false;
2474 
2475  /* Safe to fetch data about the target foreign rel */
2476  if (fscan->scan.scanrelid == 0)
2477  {
2478  foreignrel = find_join_rel(root, fscan->fs_relids);
2479  /* We should have a rel for this foreign join. */
2480  Assert(foreignrel);
2481  }
2482  else
2483  foreignrel = root->simple_rel_array[resultRelation];
2484  rte = root->simple_rte_array[resultRelation];
2485  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2486 
2487  /*
2488  * It's unsafe to update a foreign table directly, if any expressions to
2489  * assign to the target columns are unsafe to evaluate remotely.
2490  */
2491  if (operation == CMD_UPDATE)
2492  {
2493  ListCell *lc,
2494  *lc2;
2495 
2496  /*
2497  * The expressions of concern are the first N columns of the processed
2498  * targetlist, where N is the length of the rel's update_colnos.
2499  */
2500  get_translated_update_targetlist(root, resultRelation,
2501  &processed_tlist, &targetAttrs);
2502  forboth(lc, processed_tlist, lc2, targetAttrs)
2503  {
2504  TargetEntry *tle = lfirst_node(TargetEntry, lc);
2505  AttrNumber attno = lfirst_int(lc2);
2506 
2507  /* update's new-value expressions shouldn't be resjunk */
2508  Assert(!tle->resjunk);
2509 
2510  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2511  elog(ERROR, "system-column update is not supported");
2512 
2513  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2514  return false;
2515  }
2516  }
2517 
2518  /*
2519  * Ok, rewrite subplan so as to modify the foreign table directly.
2520  */
2521  initStringInfo(&sql);
2522 
2523  /*
2524  * Core code already has some lock on each rel being planned, so we can
2525  * use NoLock here.
2526  */
2527  rel = table_open(rte->relid, NoLock);
2528 
2529  /*
2530  * Recall the qual clauses that must be evaluated remotely. (These are
2531  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2532  * doesn't care.)
2533  */
2534  remote_exprs = fpinfo->final_remote_exprs;
2535 
2536  /*
2537  * Extract the relevant RETURNING list if any.
2538  */
2539  if (plan->returningLists)
2540  {
2541  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2542 
2543  /*
2544  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2545  * we fetch from the foreign server any Vars specified in RETURNING
2546  * that refer not only to the target relation but to non-target
2547  * relations. So we'll deparse them into the RETURNING clause of the
2548  * remote query; use a targetlist consisting of them instead, which
2549  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2550  * node below.
2551  */
2552  if (fscan->scan.scanrelid == 0)
2553  returningList = build_remote_returning(resultRelation, rel,
2554  returningList);
2555  }
2556 
2557  /*
2558  * Construct the SQL command string.
2559  */
2560  switch (operation)
2561  {
2562  case CMD_UPDATE:
2563  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2564  foreignrel,
2565  processed_tlist,
2566  targetAttrs,
2567  remote_exprs, &params_list,
2568  returningList, &retrieved_attrs);
2569  break;
2570  case CMD_DELETE:
2571  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2572  foreignrel,
2573  remote_exprs, &params_list,
2574  returningList, &retrieved_attrs);
2575  break;
2576  default:
2577  elog(ERROR, "unexpected operation: %d", (int) operation);
2578  break;
2579  }
2580 
2581  /*
2582  * Update the operation and target relation info.
2583  */
2584  fscan->operation = operation;
2585  fscan->resultRelation = resultRelation;
2586 
2587  /*
2588  * Update the fdw_exprs list that will be available to the executor.
2589  */
2590  fscan->fdw_exprs = params_list;
2591 
2592  /*
2593  * Update the fdw_private list that will be available to the executor.
2594  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2595  */
2596  fscan->fdw_private = list_make4(makeString(sql.data),
2597  makeBoolean((retrieved_attrs != NIL)),
2598  retrieved_attrs,
2599  makeBoolean(plan->canSetTag));
2600 
2601  /*
2602  * Update the foreign-join-related fields.
2603  */
2604  if (fscan->scan.scanrelid == 0)
2605  {
2606  /* No need for the outer subplan. */
2607  fscan->scan.plan.lefttree = NULL;
2608 
2609  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2610  if (returningList)
2611  rebuild_fdw_scan_tlist(fscan, returningList);
2612  }
2613 
2614  /*
2615  * Finally, unset the async-capable flag if it is set, as we currently
2616  * don't support asynchronous execution of direct modifications.
2617  */
2618  if (fscan->scan.plan.async_capable)
2619  fscan->scan.plan.async_capable = false;
2620 
2621  table_close(rel, NoLock);
2622  return true;
2623 }
2624 
2625 /*
2626  * postgresBeginDirectModify
2627  * Prepare a direct foreign table modification
2628  */
2629 static void
2631 {
2632  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2633  EState *estate = node->ss.ps.state;
2634  PgFdwDirectModifyState *dmstate;
2635  Index rtindex;
2636  Oid userid;
2637  ForeignTable *table;
2638  UserMapping *user;
2639  int numParams;
2640 
2641  /*
2642  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2643  */
2644  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2645  return;
2646 
2647  /*
2648  * We'll save private state in node->fdw_state.
2649  */
2650  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2651  node->fdw_state = (void *) dmstate;
2652 
2653  /*
2654  * Identify which user to do the remote access as. This should match what
2655  * ExecCheckPermissions() does.
2656  */
2657  userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2658 
2659  /* Get info about foreign table. */
2660  rtindex = node->resultRelInfo->ri_RangeTableIndex;
2661  if (fsplan->scan.scanrelid == 0)
2662  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2663  else
2664  dmstate->rel = node->ss.ss_currentRelation;
2665  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2666  user = GetUserMapping(userid, table->serverid);
2667 
2668  /*
2669  * Get connection to the foreign server. Connection manager will
2670  * establish new connection if necessary.
2671  */
2672  dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2673 
2674  /* Update the foreign-join-related fields. */
2675  if (fsplan->scan.scanrelid == 0)
2676  {
2677  /* Save info about foreign table. */
2678  dmstate->resultRel = dmstate->rel;
2679 
2680  /*
2681  * Set dmstate->rel to NULL to teach get_returning_data() and
2682  * make_tuple_from_result_row() that columns fetched from the remote
2683  * server are described by fdw_scan_tlist of the foreign-scan plan
2684  * node, not the tuple descriptor for the target relation.
2685  */
2686  dmstate->rel = NULL;
2687  }
2688 
2689  /* Initialize state variable */
2690  dmstate->num_tuples = -1; /* -1 means not set yet */
2691 
2692  /* Get private info created by planner functions. */
2693  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2695  dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2697  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2699  dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2701 
2702  /* Create context for per-tuple temp workspace. */
2703  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2704  "postgres_fdw temporary data",
2706 
2707  /* Prepare for input conversion of RETURNING results. */
2708  if (dmstate->has_returning)
2709  {
2710  TupleDesc tupdesc;
2711 
2712  if (fsplan->scan.scanrelid == 0)
2713  tupdesc = get_tupdesc_for_join_scan_tuples(node);
2714  else
2715  tupdesc = RelationGetDescr(dmstate->rel);
2716 
2717  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2718 
2719  /*
2720  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2721  * initialize a filter to extract an updated/deleted tuple from a scan
2722  * tuple.
2723  */
2724  if (fsplan->scan.scanrelid == 0)
2725  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2726  }
2727 
2728  /*
2729  * Prepare for processing of parameters used in remote query, if any.
2730  */
2731  numParams = list_length(fsplan->fdw_exprs);
2732  dmstate->numParams = numParams;
2733  if (numParams > 0)
2735  fsplan->fdw_exprs,
2736  numParams,
2737  &dmstate->param_flinfo,
2738  &dmstate->param_exprs,
2739  &dmstate->param_values);
2740 }
2741 
2742 /*
2743  * postgresIterateDirectModify
2744  * Execute a direct foreign table modification
2745  */
2746 static TupleTableSlot *
2748 {
2750  EState *estate = node->ss.ps.state;
2751  ResultRelInfo *resultRelInfo = node->resultRelInfo;
2752 
2753  /*
2754  * If this is the first call after Begin, execute the statement.
2755  */
2756  if (dmstate->num_tuples == -1)
2757  execute_dml_stmt(node);
2758 
2759  /*
2760  * If the local query doesn't specify RETURNING, just clear tuple slot.
2761  */
2762  if (!resultRelInfo->ri_projectReturning)
2763  {
2764  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2765  Instrumentation *instr = node->ss.ps.instrument;
2766 
2767  Assert(!dmstate->has_returning);
2768 
2769  /* Increment the command es_processed count if necessary. */
2770  if (dmstate->set_processed)
2771  estate->es_processed += dmstate->num_tuples;
2772 
2773  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2774  if (instr)
2775  instr->tuplecount += dmstate->num_tuples;
2776 
2777  return ExecClearTuple(slot);
2778  }
2779 
2780  /*
2781  * Get the next RETURNING tuple.
2782  */
2783  return get_returning_data(node);
2784 }
2785 
2786 /*
2787  * postgresEndDirectModify
2788  * Finish a direct foreign table modification
2789  */
2790 static void
2792 {
2794 
2795  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2796  if (dmstate == NULL)
2797  return;
2798 
2799  /* Release PGresult */
2800  PQclear(dmstate->result);
2801 
2802  /* Release remote connection */
2803  ReleaseConnection(dmstate->conn);
2804  dmstate->conn = NULL;
2805 
2806  /* MemoryContext will be deleted automatically. */
2807 }
2808 
2809 /*
2810  * postgresExplainForeignScan
2811  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2812  */
2813 static void
2815 {
2816  ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2817  List *fdw_private = plan->fdw_private;
2818 
2819  /*
2820  * Identify foreign scans that are really joins or upper relations. The
2821  * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2822  * digit string(s), which are RT indexes, with the correct relation names.
2823  * We do that here, not when the plan is created, because we can't know
2824  * what aliases ruleutils.c will assign at plan creation time.
2825  */
2826  if (list_length(fdw_private) > FdwScanPrivateRelations)
2827  {
2828  StringInfo relations;
2829  char *rawrelations;
2830  char *ptr;
2831  int minrti,
2832  rtoffset;
2833 
2834  rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2835 
2836  /*
2837  * A difficulty with using a string representation of RT indexes is
2838  * that setrefs.c won't update the string when flattening the
2839  * rangetable. To find out what rtoffset was applied, identify the
2840  * minimum RT index appearing in the string and compare it to the
2841  * minimum member of plan->fs_base_relids. (We expect all the relids
2842  * in the join will have been offset by the same amount; the Asserts
2843  * below should catch it if that ever changes.)
2844  */
2845  minrti = INT_MAX;
2846  ptr = rawrelations;
2847  while (*ptr)
2848  {
2849  if (isdigit((unsigned char) *ptr))
2850  {
2851  int rti = strtol(ptr, &ptr, 10);
2852 
2853  if (rti < minrti)
2854  minrti = rti;
2855  }
2856  else
2857  ptr++;
2858  }
2859  rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti;
2860 
2861  /* Now we can translate the string */
2862  relations = makeStringInfo();
2863  ptr = rawrelations;
2864  while (*ptr)
2865  {
2866  if (isdigit((unsigned char) *ptr))
2867  {
2868  int rti = strtol(ptr, &ptr, 10);
2869  RangeTblEntry *rte;
2870  char *relname;
2871  char *refname;
2872 
2873  rti += rtoffset;
2874  Assert(bms_is_member(rti, plan->fs_base_relids));
2875  rte = rt_fetch(rti, es->rtable);
2876  Assert(rte->rtekind == RTE_RELATION);
2877  /* This logic should agree with explain.c's ExplainTargetRel */
2878  relname = get_rel_name(rte->relid);
2879  if (es->verbose)
2880  {
2881  char *namespace;
2882 
2883  namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid));
2884  appendStringInfo(relations, "%s.%s",
2885  quote_identifier(namespace),
2887  }
2888  else
2889  appendStringInfoString(relations,
2891  refname = (char *) list_nth(es->rtable_names, rti - 1);
2892  if (refname == NULL)
2893  refname = rte->eref->aliasname;
2894  if (strcmp(refname, relname) != 0)
2895  appendStringInfo(relations, " %s",
2896  quote_identifier(refname));
2897  }
2898  else
2899  appendStringInfoChar(relations, *ptr++);
2900  }
2901  ExplainPropertyText("Relations", relations->data, es);
2902  }
2903 
2904  /*
2905  * Add remote query, when VERBOSE option is specified.
2906  */
2907  if (es->verbose)
2908  {
2909  char *sql;
2910 
2911  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2912  ExplainPropertyText("Remote SQL", sql, es);
2913  }
2914 }
2915 
2916 /*
2917  * postgresExplainForeignModify
2918  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2919  */
2920 static void
2922  ResultRelInfo *rinfo,
2923  List *fdw_private,
2924  int subplan_index,
2925  ExplainState *es)
2926 {
2927  if (es->verbose)
2928  {
2929  char *sql = strVal(list_nth(fdw_private,
2931 
2932  ExplainPropertyText("Remote SQL", sql, es);
2933 
2934  /*
2935  * For INSERT we should always have batch size >= 1, but UPDATE and
2936  * DELETE don't support batching so don't show the property.
2937  */
2938  if (rinfo->ri_BatchSize > 0)
2939  ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2940  }
2941 }
2942 
2943 /*
2944  * postgresExplainDirectModify
2945  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2946  * foreign table directly
2947  */
2948 static void
2950 {
2951  List *fdw_private;
2952  char *sql;
2953 
2954  if (es->verbose)
2955  {
2956  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2957  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2958  ExplainPropertyText("Remote SQL", sql, es);
2959  }
2960 }
2961 
2962 /*
2963  * postgresExecForeignTruncate
2964  * Truncate one or more foreign tables
2965  */
2966 static void
2968  DropBehavior behavior,
2969  bool restart_seqs)
2970 {
2971  Oid serverid = InvalidOid;
2972  UserMapping *user = NULL;
2973  PGconn *conn = NULL;
2974  StringInfoData sql;
2975  ListCell *lc;
2976  bool server_truncatable = true;
2977 
2978  /*
2979  * By default, all postgres_fdw foreign tables are assumed truncatable.
2980  * This can be overridden by a per-server setting, which in turn can be
2981  * overridden by a per-table setting.
2982  */
2983  foreach(lc, rels)
2984  {
2985  ForeignServer *server = NULL;
2986  Relation rel = lfirst(lc);
2988  ListCell *cell;
2989  bool truncatable;
2990 
2991  /*
2992  * First time through, determine whether the foreign server allows
2993  * truncates. Since all specified foreign tables are assumed to belong
2994  * to the same foreign server, this result can be used for other
2995  * foreign tables.
2996  */
2997  if (!OidIsValid(serverid))
2998  {
2999  serverid = table->serverid;
3000  server = GetForeignServer(serverid);
3001 
3002  foreach(cell, server->options)
3003  {
3004  DefElem *defel = (DefElem *) lfirst(cell);
3005 
3006  if (strcmp(defel->defname, "truncatable") == 0)
3007  {
3008  server_truncatable = defGetBoolean(defel);
3009  break;
3010  }
3011  }
3012  }
3013 
3014  /*
3015  * Confirm that all specified foreign tables belong to the same
3016  * foreign server.
3017  */
3018  Assert(table->serverid == serverid);
3019 
3020  /* Determine whether this foreign table allows truncations */
3021  truncatable = server_truncatable;
3022  foreach(cell, table->options)
3023  {
3024  DefElem *defel = (DefElem *) lfirst(cell);
3025 
3026  if (strcmp(defel->defname, "truncatable") == 0)
3027  {
3028  truncatable = defGetBoolean(defel);
3029  break;
3030  }
3031  }
3032 
3033  if (!truncatable)
3034  ereport(ERROR,
3035  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3036  errmsg("foreign table \"%s\" does not allow truncates",
3037  RelationGetRelationName(rel))));
3038  }
3039  Assert(OidIsValid(serverid));
3040 
3041  /*
3042  * Get connection to the foreign server. Connection manager will
3043  * establish new connection if necessary.
3044  */
3045  user = GetUserMapping(GetUserId(), serverid);
3046  conn = GetConnection(user, false, NULL);
3047 
3048  /* Construct the TRUNCATE command string */
3049  initStringInfo(&sql);
3050  deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3051 
3052  /* Issue the TRUNCATE command to remote server */
3053  do_sql_command(conn, sql.data);
3054 
3055  pfree(sql.data);
3056 }
3057 
3058 /*
3059  * estimate_path_cost_size
3060  * Get cost and size estimates for a foreign scan on given foreign relation
3061  * either a base relation or a join between foreign relations or an upper
3062  * relation containing foreign relations.
3063  *
3064  * param_join_conds are the parameterization clauses with outer relations.
3065  * pathkeys specify the expected sort order if any for given path being costed.
3066  * fpextra specifies additional post-scan/join-processing steps such as the
3067  * final sort and the LIMIT restriction.
3068  *
3069  * The function returns the cost and size estimates in p_rows, p_width,
3070  * p_startup_cost and p_total_cost variables.
3071  */
3072 static void
3074  RelOptInfo *foreignrel,
3075  List *param_join_conds,
3076  List *pathkeys,
3077  PgFdwPathExtraData *fpextra,
3078  double *p_rows, int *p_width,
3079  Cost *p_startup_cost, Cost *p_total_cost)
3080 {
3081  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3082  double rows;
3083  double retrieved_rows;
3084  int width;
3085  Cost startup_cost;
3086  Cost total_cost;
3087 
3088  /* Make sure the core code has set up the relation's reltarget */
3089  Assert(foreignrel->reltarget);
3090 
3091  /*
3092  * If the table or the server is configured to use remote estimates,
3093  * connect to the foreign server and execute EXPLAIN to estimate the
3094  * number of rows selected by the restriction+join clauses. Otherwise,
3095  * estimate rows using whatever statistics we have locally, in a way
3096  * similar to ordinary tables.
3097  */
3098  if (fpinfo->use_remote_estimate)
3099  {
3100  List *remote_param_join_conds;
3101  List *local_param_join_conds;
3102  StringInfoData sql;
3103  PGconn *conn;
3104  Selectivity local_sel;
3105  QualCost local_cost;
3106  List *fdw_scan_tlist = NIL;
3107  List *remote_conds;
3108 
3109  /* Required only to be passed to deparseSelectStmtForRel */
3110  List *retrieved_attrs;
3111 
3112  /*
3113  * param_join_conds might contain both clauses that are safe to send
3114  * across, and clauses that aren't.
3115  */
3116  classifyConditions(root, foreignrel, param_join_conds,
3117  &remote_param_join_conds, &local_param_join_conds);
3118 
3119  /* Build the list of columns to be fetched from the foreign server. */
3120  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3121  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3122  else
3123  fdw_scan_tlist = NIL;
3124 
3125  /*
3126  * The complete list of remote conditions includes everything from
3127  * baserestrictinfo plus any extra join_conds relevant to this
3128  * particular path.
3129  */
3130  remote_conds = list_concat(remote_param_join_conds,
3131  fpinfo->remote_conds);
3132 
3133  /*
3134  * Construct EXPLAIN query including the desired SELECT, FROM, and
3135  * WHERE clauses. Params and other-relation Vars are replaced by dummy
3136  * values, so don't request params_list.
3137  */
3138  initStringInfo(&sql);
3139  appendStringInfoString(&sql, "EXPLAIN ");
3140  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3141  remote_conds, pathkeys,
3142  fpextra ? fpextra->has_final_sort : false,
3143  fpextra ? fpextra->has_limit : false,
3144  false, &retrieved_attrs, NULL);
3145 
3146  /* Get the remote estimate */
3147  conn = GetConnection(fpinfo->user, false, NULL);
3148  get_remote_estimate(sql.data, conn, &rows, &width,
3149  &startup_cost, &total_cost);
3151 
3152  retrieved_rows = rows;
3153 
3154  /* Factor in the selectivity of the locally-checked quals */
3155  local_sel = clauselist_selectivity(root,
3156  local_param_join_conds,
3157  foreignrel->relid,
3158  JOIN_INNER,
3159  NULL);
3160  local_sel *= fpinfo->local_conds_sel;
3161 
3162  rows = clamp_row_est(rows * local_sel);
3163 
3164  /* Add in the eval cost of the locally-checked quals */
3165  startup_cost += fpinfo->local_conds_cost.startup;
3166  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3167  cost_qual_eval(&local_cost, local_param_join_conds, root);
3168  startup_cost += local_cost.startup;
3169  total_cost += local_cost.per_tuple * retrieved_rows;
3170 
3171  /*
3172  * Add in tlist eval cost for each output row. In case of an
3173  * aggregate, some of the tlist expressions such as grouping
3174  * expressions will be evaluated remotely, so adjust the costs.
3175  */
3176  startup_cost += foreignrel->reltarget->cost.startup;
3177  total_cost += foreignrel->reltarget->cost.startup;
3178  total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3179  if (IS_UPPER_REL(foreignrel))
3180  {
3181  QualCost tlist_cost;
3182 
3183  cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3184  startup_cost -= tlist_cost.startup;
3185  total_cost -= tlist_cost.startup;
3186  total_cost -= tlist_cost.per_tuple * rows;
3187  }
3188  }
3189  else
3190  {
3191  Cost run_cost = 0;
3192 
3193  /*
3194  * We don't support join conditions in this mode (hence, no
3195  * parameterized paths can be made).
3196  */
3197  Assert(param_join_conds == NIL);
3198 
3199  /*
3200  * We will come here again and again with different set of pathkeys or
3201  * additional post-scan/join-processing steps that caller wants to
3202  * cost. We don't need to calculate the cost/size estimates for the
3203  * underlying scan, join, or grouping each time. Instead, use those
3204  * estimates if we have cached them already.
3205  */
3206  if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3207  {
3208  Assert(fpinfo->retrieved_rows >= 0);
3209 
3210  rows = fpinfo->rows;
3211  retrieved_rows = fpinfo->retrieved_rows;
3212  width = fpinfo->width;
3213  startup_cost = fpinfo->rel_startup_cost;
3214  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3215 
3216  /*
3217  * If we estimate the costs of a foreign scan or a foreign join
3218  * with additional post-scan/join-processing steps, the scan or
3219  * join costs obtained from the cache wouldn't yet contain the
3220  * eval costs for the final scan/join target, which would've been
3221  * updated by apply_scanjoin_target_to_paths(); add the eval costs
3222  * now.
3223  */
3224  if (fpextra && !IS_UPPER_REL(foreignrel))
3225  {
3226  /* Shouldn't get here unless we have LIMIT */
3227  Assert(fpextra->has_limit);
3228  Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3229  foreignrel->reloptkind == RELOPT_JOINREL);
3230  startup_cost += foreignrel->reltarget->cost.startup;
3231  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3232  }
3233  }
3234  else if (IS_JOIN_REL(foreignrel))
3235  {
3236  PgFdwRelationInfo *fpinfo_i;
3237  PgFdwRelationInfo *fpinfo_o;
3238  QualCost join_cost;
3239  QualCost remote_conds_cost;
3240  double nrows;
3241 
3242  /* Use rows/width estimates made by the core code. */
3243  rows = foreignrel->rows;
3244  width = foreignrel->reltarget->width;
3245 
3246  /* For join we expect inner and outer relations set */
3247  Assert(fpinfo->innerrel && fpinfo->outerrel);
3248 
3249  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3250  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3251 
3252  /* Estimate of number of rows in cross product */
3253  nrows = fpinfo_i->rows * fpinfo_o->rows;
3254 
3255  /*
3256  * Back into an estimate of the number of retrieved rows. Just in
3257  * case this is nuts, clamp to at most nrows.
3258  */
3259  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3260  retrieved_rows = Min(retrieved_rows, nrows);
3261 
3262  /*
3263  * The cost of foreign join is estimated as cost of generating
3264  * rows for the joining relations + cost for applying quals on the
3265  * rows.
3266  */
3267 
3268  /*
3269  * Calculate the cost of clauses pushed down to the foreign server
3270  */
3271  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3272  /* Calculate the cost of applying join clauses */
3273  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3274 
3275  /*
3276  * Startup cost includes startup cost of joining relations and the
3277  * startup cost for join and other clauses. We do not include the
3278  * startup cost specific to join strategy (e.g. setting up hash
3279  * tables) since we do not know what strategy the foreign server
3280  * is going to use.
3281  */
3282  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3283  startup_cost += join_cost.startup;
3284  startup_cost += remote_conds_cost.startup;
3285  startup_cost += fpinfo->local_conds_cost.startup;
3286 
3287  /*
3288  * Run time cost includes:
3289  *
3290  * 1. Run time cost (total_cost - startup_cost) of relations being
3291  * joined
3292  *
3293  * 2. Run time cost of applying join clauses on the cross product
3294  * of the joining relations.
3295  *
3296  * 3. Run time cost of applying pushed down other clauses on the
3297  * result of join
3298  *
3299  * 4. Run time cost of applying nonpushable other clauses locally
3300  * on the result fetched from the foreign server.
3301  */
3302  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3303  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3304  run_cost += nrows * join_cost.per_tuple;
3305  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3306  run_cost += nrows * remote_conds_cost.per_tuple;
3307  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3308 
3309  /* Add in tlist eval cost for each output row */
3310  startup_cost += foreignrel->reltarget->cost.startup;
3311  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3312  }
3313  else if (IS_UPPER_REL(foreignrel))
3314  {
3315  RelOptInfo *outerrel = fpinfo->outerrel;
3316  PgFdwRelationInfo *ofpinfo;
3317  AggClauseCosts aggcosts;
3318  double input_rows;
3319  int numGroupCols;
3320  double numGroups = 1;
3321 
3322  /* The upper relation should have its outer relation set */
3323  Assert(outerrel);
3324  /* and that outer relation should have its reltarget set */
3325  Assert(outerrel->reltarget);
3326 
3327  /*
3328  * This cost model is mixture of costing done for sorted and
3329  * hashed aggregates in cost_agg(). We are not sure which
3330  * strategy will be considered at remote side, thus for
3331  * simplicity, we put all startup related costs in startup_cost
3332  * and all finalization and run cost are added in total_cost.
3333  */
3334 
3335  ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3336 
3337  /* Get rows from input rel */
3338  input_rows = ofpinfo->rows;
3339 
3340  /* Collect statistics about aggregates for estimating costs. */
3341  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3342  if (root->parse->hasAggs)
3343  {
3344  get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts);
3345  }
3346 
3347  /* Get number of grouping columns and possible number of groups */
3348  numGroupCols = list_length(root->processed_groupClause);
3349  numGroups = estimate_num_groups(root,
3351  fpinfo->grouped_tlist),
3352  input_rows, NULL, NULL);
3353 
3354  /*
3355  * Get the retrieved_rows and rows estimates. If there are HAVING
3356  * quals, account for their selectivity.
3357  */
3358  if (root->hasHavingQual)
3359  {
3360  /* Factor in the selectivity of the remotely-checked quals */
3361  retrieved_rows =
3362  clamp_row_est(numGroups *
3364  fpinfo->remote_conds,
3365  0,
3366  JOIN_INNER,
3367  NULL));
3368  /* Factor in the selectivity of the locally-checked quals */
3369  rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3370  }
3371  else
3372  {
3373  rows = retrieved_rows = numGroups;
3374  }
3375 
3376  /* Use width estimate made by the core code. */
3377  width = foreignrel->reltarget->width;
3378 
3379  /*-----
3380  * Startup cost includes:
3381  * 1. Startup cost for underneath input relation, adjusted for
3382  * tlist replacement by apply_scanjoin_target_to_paths()
3383  * 2. Cost of performing aggregation, per cost_agg()
3384  *-----
3385  */
3386  startup_cost = ofpinfo->rel_startup_cost;
3387  startup_cost += outerrel->reltarget->cost.startup;
3388  startup_cost += aggcosts.transCost.startup;
3389  startup_cost += aggcosts.transCost.per_tuple * input_rows;
3390  startup_cost += aggcosts.finalCost.startup;
3391  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3392 
3393  /*-----
3394  * Run time cost includes:
3395  * 1. Run time cost of underneath input relation, adjusted for
3396  * tlist replacement by apply_scanjoin_target_to_paths()
3397  * 2. Run time cost of performing aggregation, per cost_agg()
3398  *-----
3399  */
3400  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3401  run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3402  run_cost += aggcosts.finalCost.per_tuple * numGroups;
3403  run_cost += cpu_tuple_cost * numGroups;
3404 
3405  /* Account for the eval cost of HAVING quals, if any */
3406  if (root->hasHavingQual)
3407  {
3408  QualCost remote_cost;
3409 
3410  /* Add in the eval cost of the remotely-checked quals */
3411  cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3412  startup_cost += remote_cost.startup;
3413  run_cost += remote_cost.per_tuple * numGroups;
3414  /* Add in the eval cost of the locally-checked quals */
3415  startup_cost += fpinfo->local_conds_cost.startup;
3416  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3417  }
3418 
3419  /* Add in tlist eval cost for each output row */
3420  startup_cost += foreignrel->reltarget->cost.startup;
3421  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3422  }
3423  else
3424  {
3425  Cost cpu_per_tuple;
3426 
3427  /* Use rows/width estimates made by set_baserel_size_estimates. */
3428  rows = foreignrel->rows;
3429  width = foreignrel->reltarget->width;
3430 
3431  /*
3432  * Back into an estimate of the number of retrieved rows. Just in
3433  * case this is nuts, clamp to at most foreignrel->tuples.
3434  */
3435  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3436  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3437 
3438  /*
3439  * Cost as though this were a seqscan, which is pessimistic. We
3440  * effectively imagine the local_conds are being evaluated
3441  * remotely, too.
3442  */
3443  startup_cost = 0;
3444  run_cost = 0;
3445  run_cost += seq_page_cost * foreignrel->pages;
3446 
3447  startup_cost += foreignrel->baserestrictcost.startup;
3448  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3449  run_cost += cpu_per_tuple * foreignrel->tuples;
3450 
3451  /* Add in tlist eval cost for each output row */
3452  startup_cost += foreignrel->reltarget->cost.startup;
3453  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3454  }
3455 
3456  /*
3457  * Without remote estimates, we have no real way to estimate the cost
3458  * of generating sorted output. It could be free if the query plan
3459  * the remote side would have chosen generates properly-sorted output
3460  * anyway, but in most cases it will cost something. Estimate a value
3461  * high enough that we won't pick the sorted path when the ordering
3462  * isn't locally useful, but low enough that we'll err on the side of
3463  * pushing down the ORDER BY clause when it's useful to do so.
3464  */
3465  if (pathkeys != NIL)
3466  {
3467  if (IS_UPPER_REL(foreignrel))
3468  {
3469  Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3470  fpinfo->stage == UPPERREL_GROUP_AGG);
3471  adjust_foreign_grouping_path_cost(root, pathkeys,
3472  retrieved_rows, width,
3473  fpextra->limit_tuples,
3474  &startup_cost, &run_cost);
3475  }
3476  else
3477  {
3478  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3479  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3480  }
3481  }
3482 
3483  total_cost = startup_cost + run_cost;
3484 
3485  /* Adjust the cost estimates if we have LIMIT */
3486  if (fpextra && fpextra->has_limit)
3487  {
3488  adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3489  fpextra->offset_est, fpextra->count_est);
3490  retrieved_rows = rows;
3491  }
3492  }
3493 
3494  /*
3495  * If this includes the final sort step, the given target, which will be
3496  * applied to the resulting path, might have different expressions from
3497  * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3498  * eval costs.
3499  */
3500  if (fpextra && fpextra->has_final_sort &&
3501  fpextra->target != foreignrel->reltarget)
3502  {
3503  QualCost oldcost = foreignrel->reltarget->cost;
3504  QualCost newcost = fpextra->target->cost;
3505 
3506  startup_cost += newcost.startup - oldcost.startup;
3507  total_cost += newcost.startup - oldcost.startup;
3508  total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3509  }
3510 
3511  /*
3512  * Cache the retrieved rows and cost estimates for scans, joins, or
3513  * groupings without any parameterization, pathkeys, or additional
3514  * post-scan/join-processing steps, before adding the costs for
3515  * transferring data from the foreign server. These estimates are useful
3516  * for costing remote joins involving this relation or costing other
3517  * remote operations on this relation such as remote sorts and remote
3518  * LIMIT restrictions, when the costs can not be obtained from the foreign
3519  * server. This function will be called at least once for every foreign
3520  * relation without any parameterization, pathkeys, or additional
3521  * post-scan/join-processing steps.
3522  */
3523  if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3524  {
3525  fpinfo->retrieved_rows = retrieved_rows;
3526  fpinfo->rel_startup_cost = startup_cost;
3527  fpinfo->rel_total_cost = total_cost;
3528  }
3529 
3530  /*
3531  * Add some additional cost factors to account for connection overhead
3532  * (fdw_startup_cost), transferring data across the network
3533  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3534  * (cpu_tuple_cost per retrieved row).
3535  */
3536  startup_cost += fpinfo->fdw_startup_cost;
3537  total_cost += fpinfo->fdw_startup_cost;
3538  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3539  total_cost += cpu_tuple_cost * retrieved_rows;
3540 
3541  /*
3542  * If we have LIMIT, we should prefer performing the restriction remotely
3543  * rather than locally, as the former avoids extra row fetches from the
3544  * remote that the latter might cause. But since the core code doesn't
3545  * account for such fetches when estimating the costs of the local
3546  * restriction (see create_limit_path()), there would be no difference
3547  * between the costs of the local restriction and the costs of the remote
3548  * restriction estimated above if we don't use remote estimates (except
3549  * for the case where the foreignrel is a grouping relation, the given
3550  * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3551  * accounted for in costing the remote restriction). Tweak the costs of
3552  * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3553  * one.
3554  */
3555  if (!fpinfo->use_remote_estimate &&
3556  fpextra && fpextra->has_limit &&
3557  fpextra->limit_tuples > 0 &&
3558  fpextra->limit_tuples < fpinfo->rows)
3559  {
3560  Assert(fpinfo->rows > 0);
3561  total_cost -= (total_cost - startup_cost) * 0.05 *
3562  (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3563  }
3564 
3565  /* Return results. */
3566  *p_rows = rows;
3567  *p_width = width;
3568  *p_startup_cost = startup_cost;
3569  *p_total_cost = total_cost;
3570 }
3571 
3572 /*
3573  * Estimate costs of executing a SQL statement remotely.
3574  * The given "sql" must be an EXPLAIN command.
3575  */
3576 static void
3577 get_remote_estimate(const char *sql, PGconn *conn,
3578  double *rows, int *width,
3579  Cost *startup_cost, Cost *total_cost)
3580 {
3581  PGresult *volatile res = NULL;
3582 
3583  /* PGresult must be released before leaving this function. */
3584  PG_TRY();
3585  {
3586  char *line;
3587  char *p;
3588  int n;
3589 
3590  /*
3591  * Execute EXPLAIN remotely.
3592  */
3593  res = pgfdw_exec_query(conn, sql, NULL);
3595  pgfdw_report_error(ERROR, res, conn, false, sql);
3596 
3597  /*
3598  * Extract cost numbers for topmost plan node. Note we search for a
3599  * left paren from the end of the line to avoid being confused by
3600  * other uses of parentheses.
3601  */
3602  line = PQgetvalue(res, 0, 0);
3603  p = strrchr(line, '(');
3604  if (p == NULL)
3605  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3606  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3607  startup_cost, total_cost, rows, width);
3608  if (n != 4)
3609  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3610  }
3611  PG_FINALLY();
3612  {
3613  PQclear(res);
3614  }
3615  PG_END_TRY();
3616 }
3617 
3618 /*
3619  * Adjust the cost estimates of a foreign grouping path to include the cost of
3620  * generating properly-sorted output.
3621  */
3622 static void
3624  List *pathkeys,
3625  double retrieved_rows,
3626  double width,
3627  double limit_tuples,
3628  Cost *p_startup_cost,
3629  Cost *p_run_cost)
3630 {
3631  /*
3632  * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3633  * side is unlikely to generate properly-sorted output, so it would need
3634  * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3635  * if the GROUP BY clause is sort-able but isn't a superset of the given
3636  * pathkeys, adjust the costs with that function. Otherwise, adjust the
3637  * costs by applying the same heuristic as for the scan or join case.
3638  */
3640  !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3641  {
3642  Path sort_path; /* dummy for result of cost_sort */
3643 
3644  cost_sort(&sort_path,
3645  root,
3646  pathkeys,
3647  *p_startup_cost + *p_run_cost,
3648  retrieved_rows,
3649  width,
3650  0.0,
3651  work_mem,
3652  limit_tuples);
3653 
3654  *p_startup_cost = sort_path.startup_cost;
3655  *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3656  }
3657  else
3658  {
3659  /*
3660  * The default extra cost seems too large for foreign-grouping cases;
3661  * add 1/4th of that default.
3662  */
3663  double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3664  - 1.0) * 0.25;
3665 
3666  *p_startup_cost *= sort_multiplier;
3667  *p_run_cost *= sort_multiplier;
3668  }
3669 }
3670 
3671 /*
3672  * Detect whether we want to process an EquivalenceClass member.
3673  *
3674  * This is a callback for use by generate_implied_equalities_for_column.
3675  */
3676 static bool
3679  void *arg)
3680 {
3682  Expr *expr = em->em_expr;
3683 
3684  /*
3685  * If we've identified what we're processing in the current scan, we only
3686  * want to match that expression.
3687  */
3688  if (state->current != NULL)
3689  return equal(expr, state->current);
3690 
3691  /*
3692  * Otherwise, ignore anything we've already processed.
3693  */
3694  if (list_member(state->already_used, expr))
3695  return false;
3696 
3697  /* This is the new target to process. */
3698  state->current = expr;
3699  return true;
3700 }
3701 
3702 /*
3703  * Create cursor for node's query with current parameter values.
3704  */
3705 static void
3707 {
3708  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3709  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3710  int numParams = fsstate->numParams;
3711  const char **values = fsstate->param_values;
3712  PGconn *conn = fsstate->conn;
3714  PGresult *res;
3715 
3716  /* First, process a pending asynchronous request, if any. */
3717  if (fsstate->conn_state->pendingAreq)
3719 
3720  /*
3721  * Construct array of query parameter values in text format. We do the
3722  * conversions in the short-lived per-tuple context, so as not to cause a
3723  * memory leak over repeated scans.
3724  */
3725  if (numParams > 0)
3726  {
3727  MemoryContext oldcontext;
3728 
3729  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3730 
3731  process_query_params(econtext,
3732  fsstate->param_flinfo,
3733  fsstate->param_exprs,
3734  values);
3735 
3736  MemoryContextSwitchTo(oldcontext);
3737  }
3738 
3739  /* Construct the DECLARE CURSOR command */
3740  initStringInfo(&buf);
3741  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3742  fsstate->cursor_number, fsstate->query);
3743 
3744  /*
3745  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3746  * to infer types for all parameters. Since we explicitly cast every
3747  * parameter (see deparse.c), the "inference" is trivial and will produce
3748  * the desired result. This allows us to avoid assuming that the remote
3749  * server has the same OIDs we do for the parameters' types.
3750  */
3751  if (!PQsendQueryParams(conn, buf.data, numParams,
3752  NULL, values, NULL, NULL, 0))
3753  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3754 
3755  /*
3756  * Get the result, and check for success.
3757  *
3758  * We don't use a PG_TRY block here, so be careful not to throw error
3759  * without releasing the PGresult.
3760  */
3761  res = pgfdw_get_result(conn, buf.data);
3763  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3764  PQclear(res);
3765 
3766  /* Mark the cursor as created, and show no tuples have been retrieved */
3767  fsstate->cursor_exists = true;
3768  fsstate->tuples = NULL;
3769  fsstate->num_tuples = 0;
3770  fsstate->next_tuple = 0;
3771  fsstate->fetch_ct_2 = 0;
3772  fsstate->eof_reached = false;
3773 
3774  /* Clean up */
3775  pfree(buf.data);
3776 }
3777 
3778 /*
3779  * Fetch some more rows from the node's cursor.
3780  */
3781 static void
3783 {
3784  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3785  PGresult *volatile res = NULL;
3786  MemoryContext oldcontext;
3787 
3788  /*
3789  * We'll store the tuples in the batch_cxt. First, flush the previous
3790  * batch.
3791  */
3792  fsstate->tuples = NULL;
3793  MemoryContextReset(fsstate->batch_cxt);
3794  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3795 
3796  /* PGresult must be released before leaving this function. */
3797  PG_TRY();
3798  {
3799  PGconn *conn = fsstate->conn;
3800  int numrows;
3801  int i;
3802 
3803  if (fsstate->async_capable)
3804  {
3805  Assert(fsstate->conn_state->pendingAreq);
3806 
3807  /*
3808  * The query was already sent by an earlier call to
3809  * fetch_more_data_begin. So now we just fetch the result.
3810  */
3811  res = pgfdw_get_result(conn, fsstate->query);
3812  /* On error, report the original query, not the FETCH. */
3814  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3815 
3816  /* Reset per-connection state */
3817  fsstate->conn_state->pendingAreq = NULL;
3818  }
3819  else
3820  {
3821  char sql[64];
3822 
3823  /* This is a regular synchronous fetch. */
3824  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3825  fsstate->fetch_size, fsstate->cursor_number);
3826 
3827  res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3828  /* On error, report the original query, not the FETCH. */
3830  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3831  }
3832 
3833  /* Convert the data into HeapTuples */
3834  numrows = PQntuples(res);
3835  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3836  fsstate->num_tuples = numrows;
3837  fsstate->next_tuple = 0;
3838 
3839  for (i = 0; i < numrows; i++)
3840  {
3841  Assert(IsA(node->ss.ps.plan, ForeignScan));
3842 
3843  fsstate->tuples[i] =
3845  fsstate->rel,
3846  fsstate->attinmeta,
3847  fsstate->retrieved_attrs,
3848  node,
3849  fsstate->temp_cxt);
3850  }
3851 
3852  /* Update fetch_ct_2 */
3853  if (fsstate->fetch_ct_2 < 2)
3854  fsstate->fetch_ct_2++;
3855 
3856  /* Must be EOF if we didn't get as many tuples as we asked for. */
3857  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3858  }
3859  PG_FINALLY();
3860  {
3861  PQclear(res);
3862  }
3863  PG_END_TRY();
3864 
3865  MemoryContextSwitchTo(oldcontext);
3866 }
3867 
3868 /*
3869  * Force assorted GUC parameters to settings that ensure that we'll output
3870  * data values in a form that is unambiguous to the remote server.
3871  *
3872  * This is rather expensive and annoying to do once per row, but there's
3873  * little choice if we want to be sure values are transmitted accurately;
3874  * we can't leave the settings in place between rows for fear of affecting
3875  * user-visible computations.
3876  *
3877  * We use the equivalent of a function SET option to allow the settings to
3878  * persist only until the caller calls reset_transmission_modes(). If an
3879  * error is thrown in between, guc.c will take care of undoing the settings.
3880  *
3881  * The return value is the nestlevel that must be passed to
3882  * reset_transmission_modes() to undo things.
3883  */
3884 int
3886 {
3887  int nestlevel = NewGUCNestLevel();
3888 
3889  /*
3890  * The values set here should match what pg_dump does. See also
3891  * configure_remote_session in connection.c.
3892  */
3893  if (DateStyle != USE_ISO_DATES)
3894  (void) set_config_option("datestyle", "ISO",
3896  GUC_ACTION_SAVE, true, 0, false);
3898  (void) set_config_option("intervalstyle", "postgres",
3900  GUC_ACTION_SAVE, true, 0, false);
3901  if (extra_float_digits < 3)
3902  (void) set_config_option("extra_float_digits", "3",
3904  GUC_ACTION_SAVE, true, 0, false);
3905 
3906  /*
3907  * In addition force restrictive search_path, in case there are any
3908  * regproc or similar constants to be printed.
3909  */
3910  (void) set_config_option("search_path", "pg_catalog",
3912  GUC_ACTION_SAVE, true, 0, false);
3913 
3914  return nestlevel;
3915 }
3916 
3917 /*
3918  * Undo the effects of set_transmission_modes().
3919  */
3920 void
3922 {
3923  AtEOXact_GUC(true, nestlevel);
3924 }
3925 
3926 /*
3927  * Utility routine to close a cursor.
3928  */
3929 static void
3931  PgFdwConnState *conn_state)
3932 {
3933  char sql[64];
3934  PGresult *res;
3935 
3936  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3937 
3938  /*
3939  * We don't use a PG_TRY block here, so be careful not to throw error
3940  * without releasing the PGresult.
3941  */
3942  res = pgfdw_exec_query(conn, sql, conn_state);
3944  pgfdw_report_error(ERROR, res, conn, true, sql);
3945  PQclear(res);
3946 }
3947 
3948 /*
3949  * create_foreign_modify
3950  * Construct an execution state of a foreign insert/update/delete
3951  * operation
3952  */
3953 static PgFdwModifyState *
3955  RangeTblEntry *rte,
3956  ResultRelInfo *resultRelInfo,
3957  CmdType operation,
3958  Plan *subplan,
3959  char *query,
3960  List *target_attrs,
3961  int values_end,
3962  bool has_returning,
3963  List *retrieved_attrs)
3964 {
3965  PgFdwModifyState *fmstate;
3966  Relation rel = resultRelInfo->ri_RelationDesc;
3967  TupleDesc tupdesc = RelationGetDescr(rel);
3968  Oid userid;
3969  ForeignTable *table;
3970  UserMapping *user;
3971  AttrNumber n_params;
3972  Oid typefnoid;
3973  bool isvarlena;
3974  ListCell *lc;
3975 
3976  /* Begin constructing PgFdwModifyState. */
3977  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3978  fmstate->rel = rel;
3979 
3980  /* Identify which user to do the remote access as. */
3981  userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate);
3982 
3983  /* Get info about foreign table. */
3984  table = GetForeignTable(RelationGetRelid(rel));
3985  user = GetUserMapping(userid, table->serverid);
3986 
3987  /* Open connection; report that we'll create a prepared statement. */
3988  fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
3989  fmstate->p_name = NULL; /* prepared statement not made yet */
3990 
3991  /* Set up remote query information. */
3992  fmstate->query = query;
3993  if (operation == CMD_INSERT)
3994  {
3995  fmstate->query = pstrdup(fmstate->query);
3996  fmstate->orig_query = pstrdup(fmstate->query);
3997  }
3998  fmstate->target_attrs = target_attrs;
3999  fmstate->values_end = values_end;
4000  fmstate->has_returning = has_returning;
4001  fmstate->retrieved_attrs = retrieved_attrs;
4002 
4003  /* Create context for per-tuple temp workspace. */
4004  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4005  "postgres_fdw temporary data",
4007 
4008  /* Prepare for input conversion of RETURNING results. */
4009  if (fmstate->has_returning)
4010  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4011 
4012  /* Prepare for output conversion of parameters used in prepared stmt. */
4013  n_params = list_length(fmstate->target_attrs) + 1;
4014  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
4015  fmstate->p_nums = 0;
4016 
4017  if (operation == CMD_UPDATE || operation == CMD_DELETE)
4018  {
4019  Assert(subplan != NULL);
4020 
4021  /* Find the ctid resjunk column in the subplan's result */
4023  "ctid");
4024  if (!AttributeNumberIsValid(fmstate->ctidAttno))
4025  elog(ERROR, "could not find junk ctid column");
4026 
4027  /* First transmittable parameter will be ctid */
4028  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4029  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4030  fmstate->p_nums++;
4031  }
4032 
4033  if (operation == CMD_INSERT || operation == CMD_UPDATE)
4034  {
4035  /* Set up for remaining transmittable parameters */
4036  foreach(lc, fmstate->target_attrs)
4037  {
4038  int attnum = lfirst_int(lc);
4039  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4040 
4041  Assert(!attr->attisdropped);
4042 
4043  /* Ignore generated columns; they are set to DEFAULT */
4044  if (attr->attgenerated)
4045  continue;
4046  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4047  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4048  fmstate->p_nums++;
4049  }
4050  }
4051 
4052  Assert(fmstate->p_nums <= n_params);
4053 
4054  /* Set batch_size from foreign server/table options. */
4055  if (operation == CMD_INSERT)
4056  fmstate->batch_size = get_batch_size_option(rel);
4057 
4058  fmstate->num_slots = 1;
4059 
4060  /* Initialize auxiliary state */
4061  fmstate->aux_fmstate = NULL;
4062 
4063  return fmstate;
4064 }
4065 
4066 /*
4067  * execute_foreign_modify
4068  * Perform foreign-table modification as required, and fetch RETURNING
4069  * result if any. (This is the shared guts of postgresExecForeignInsert,
4070  * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4071  * postgresExecForeignDelete.)
4072  */
4073 static TupleTableSlot **
4075  ResultRelInfo *resultRelInfo,
4076  CmdType operation,
4077  TupleTableSlot **slots,
4078  TupleTableSlot **planSlots,
4079  int *numSlots)
4080 {
4081  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4082  ItemPointer ctid = NULL;
4083  const char **p_values;
4084  PGresult *res;
4085  int n_rows;
4086  StringInfoData sql;
4087 
4088  /* The operation should be INSERT, UPDATE, or DELETE */
4089  Assert(operation == CMD_INSERT ||
4090  operation == CMD_UPDATE ||
4091  operation == CMD_DELETE);
4092 
4093  /* First, process a pending asynchronous request, if any. */
4094  if (fmstate->conn_state->pendingAreq)
4096 
4097  /*
4098  * If the existing query was deparsed and prepared for a different number
4099  * of rows, rebuild it for the proper number.
4100  */
4101  if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4102  {
4103  /* Destroy the prepared statement created previously */
4104  if (fmstate->p_name)
4105  deallocate_query(fmstate);
4106 
4107  /* Build INSERT string with numSlots records in its VALUES clause. */
4108  initStringInfo(&sql);
4109  rebuildInsertSql(&sql, fmstate->rel,
4110  fmstate->orig_query, fmstate->target_attrs,
4111  fmstate->values_end, fmstate->p_nums,
4112  *numSlots - 1);
4113  pfree(fmstate->query);
4114  fmstate->query = sql.data;
4115  fmstate->num_slots = *numSlots;
4116  }
4117 
4118  /* Set up the prepared statement on the remote server, if we didn't yet */
4119  if (!fmstate->p_name)
4120  prepare_foreign_modify(fmstate);
4121 
4122  /*
4123  * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4124  */
4125  if (operation == CMD_UPDATE || operation == CMD_DELETE)
4126  {
4127  Datum datum;
4128  bool isNull;
4129 
4130  datum = ExecGetJunkAttribute(planSlots[0],
4131  fmstate->ctidAttno,
4132  &isNull);
4133  /* shouldn't ever get a null result... */
4134  if (isNull)
4135  elog(ERROR, "ctid is NULL");
4136  ctid = (ItemPointer) DatumGetPointer(datum);
4137  }
4138 
4139  /* Convert parameters needed by prepared statement to text form */
4140  p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4141 
4142  /*
4143  * Execute the prepared statement.
4144  */
4145  if (!PQsendQueryPrepared(fmstate->conn,
4146  fmstate->p_name,
4147  fmstate->p_nums * (*numSlots),
4148  p_values,
4149  NULL,
4150  NULL,
4151  0))
4152  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4153 
4154  /*
4155  * Get the result, and check for success.
4156  *
4157  * We don't use a PG_TRY block here, so be careful not to throw error
4158  * without releasing the PGresult.
4159  */
4160  res = pgfdw_get_result(fmstate->conn, fmstate->query);
4161  if (PQresultStatus(res) !=
4163  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4164 
4165  /* Check number of rows affected, and fetch RETURNING tuple if any */
4166  if (fmstate->has_returning)
4167  {
4168  Assert(*numSlots == 1);
4169  n_rows = PQntuples(res);
4170  if (n_rows > 0)
4171  store_returning_result(fmstate, slots[0], res);
4172  }
4173  else
4174  n_rows = atoi(PQcmdTuples(res));
4175 
4176  /* And clean up */
4177  PQclear(res);
4178 
4179  MemoryContextReset(fmstate->temp_cxt);
4180 
4181  *numSlots = n_rows;
4182 
4183  /*
4184  * Return NULL if nothing was inserted/updated/deleted on the remote end
4185  */
4186  return (n_rows > 0) ? slots : NULL;
4187 }
4188 
4189 /*
4190  * prepare_foreign_modify
4191  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4192  */
4193 static void
4195 {
4196  char prep_name[NAMEDATALEN];
4197  char *p_name;
4198  PGresult *res;
4199 
4200  /*
4201  * The caller would already have processed a pending asynchronous request
4202  * if any, so no need to do it here.
4203  */
4204 
4205  /* Construct name we'll use for the prepared statement. */
4206  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4207  GetPrepStmtNumber(fmstate->conn));
4208  p_name = pstrdup(prep_name);
4209 
4210  /*
4211  * We intentionally do not specify parameter types here, but leave the
4212  * remote server to derive them by default. This avoids possible problems
4213  * with the remote server using different type OIDs than we do. All of
4214  * the prepared statements we use in this module are simple enough that
4215  * the remote server will make the right choices.
4216  */
4217  if (!PQsendPrepare(fmstate->conn,
4218  p_name,
4219  fmstate->query,
4220  0,
4221  NULL))
4222  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4223 
4224  /*
4225  * Get the result, and check for success.
4226  *
4227  * We don't use a PG_TRY block here, so be careful not to throw error
4228  * without releasing the PGresult.
4229  */
4230  res = pgfdw_get_result(fmstate->conn, fmstate->query);
4232  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4233  PQclear(res);
4234 
4235  /* This action shows that the prepare has been done. */
4236  fmstate->p_name = p_name;
4237 }
4238 
4239 /*
4240  * convert_prep_stmt_params
4241  * Create array of text strings representing parameter values
4242  *
4243  * tupleid is ctid to send, or NULL if none
4244  * slot is slot to get remaining parameters from, or NULL if none
4245  *
4246  * Data is constructed in temp_cxt; caller should reset that after use.
4247  */
4248 static const char **
4250  ItemPointer tupleid,
4251  TupleTableSlot **slots,
4252  int numSlots)
4253 {
4254  const char **p_values;
4255  int i;
4256  int j;
4257  int pindex = 0;
4258  MemoryContext oldcontext;
4259 
4260  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4261 
4262  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4263 
4264  /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4265  Assert(!(tupleid != NULL && numSlots > 1));
4266 
4267  /* 1st parameter should be ctid, if it's in use */
4268  if (tupleid != NULL)
4269  {
4270  Assert(numSlots == 1);
4271  /* don't need set_transmission_modes for TID output */
4272  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4273  PointerGetDatum(tupleid));
4274  pindex++;
4275  }
4276 
4277  /* get following parameters from slots */
4278  if (slots != NULL && fmstate->target_attrs != NIL)
4279  {
4280  TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4281  int nestlevel;
4282  ListCell *lc;
4283 
4284  nestlevel = set_transmission_modes();
4285 
4286  for (i = 0; i < numSlots; i++)
4287  {
4288  j = (tupleid != NULL) ? 1 : 0;
4289  foreach(lc, fmstate->target_attrs)
4290  {
4291  int attnum = lfirst_int(lc);
4292  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4293  Datum value;
4294  bool isnull;
4295 
4296  /* Ignore generated columns; they are set to DEFAULT */
4297  if (attr->attgenerated)
4298  continue;
4299  value = slot_getattr(slots[i], attnum, &isnull);
4300  if (isnull)
4301  p_values[pindex] = NULL;
4302  else
4303  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4304  value);
4305  pindex++;
4306  j++;
4307  }
4308  }
4309 
4310  reset_transmission_modes(nestlevel);
4311  }
4312 
4313  Assert(pindex == fmstate->p_nums * numSlots);
4314 
4315  MemoryContextSwitchTo(oldcontext);
4316 
4317  return p_values;
4318 }
4319 
4320 /*
4321  * store_returning_result
4322  * Store the result of a RETURNING clause
4323  *
4324  * On error, be sure to release the PGresult on the way out. Callers do not
4325  * have PG_TRY blocks to ensure this happens.
4326  */
4327 static void
4329  TupleTableSlot *slot, PGresult *res)
4330 {
4331  PG_TRY();
4332  {
4333  HeapTuple newtup;
4334 
4335  newtup = make_tuple_from_result_row(res, 0,
4336  fmstate->rel,
4337  fmstate->attinmeta,
4338  fmstate->retrieved_attrs,
4339  NULL,
4340  fmstate->temp_cxt);
4341 
4342  /*
4343  * The returning slot will not necessarily be suitable to store
4344  * heaptuples directly, so allow for conversion.
4345  */
4346  ExecForceStoreHeapTuple(newtup, slot, true);
4347  }
4348  PG_CATCH();
4349  {
4350  PQclear(res);
4351  PG_RE_THROW();
4352  }
4353  PG_END_TRY();
4354 }
4355 
4356 /*
4357  * finish_foreign_modify
4358  * Release resources for a foreign insert/update/delete operation
4359  */
4360 static void
4362 {
4363  Assert(fmstate != NULL);
4364 
4365  /* If we created a prepared statement, destroy it */
4366  deallocate_query(fmstate);
4367 
4368  /* Release remote connection */
4369  ReleaseConnection(fmstate->conn);
4370  fmstate->conn = NULL;
4371 }
4372 
4373 /*
4374  * deallocate_query
4375  * Deallocate a prepared statement for a foreign insert/update/delete
4376  * operation
4377  */
4378 static void
4380 {
4381  char sql[64];
4382  PGresult *res;
4383 
4384  /* do nothing if the query is not allocated */
4385  if (!fmstate->p_name)
4386  return;
4387 
4388  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4389 
4390  /*
4391  * We don't use a PG_TRY block here, so be careful not to throw error
4392  * without releasing the PGresult.
4393  */
4394  res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4396  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4397  PQclear(res);
4398  pfree(fmstate->p_name);
4399  fmstate->p_name = NULL;
4400 }
4401 
4402 /*
4403  * build_remote_returning
4404  * Build a RETURNING targetlist of a remote query for performing an
4405  * UPDATE/DELETE .. RETURNING on a join directly
4406  */
4407 static List *
4408 build_remote_returning(Index rtindex, Relation rel, List *returningList)
4409 {
4410  bool have_wholerow = false;
4411  List *tlist = NIL;
4412  List *vars;
4413  ListCell *lc;
4414 
4415  Assert(returningList);
4416 
4417  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4418 
4419  /*
4420  * If there's a whole-row reference to the target relation, then we'll
4421  * need all the columns of the relation.
4422  */
4423  foreach(lc, vars)
4424  {
4425  Var *var = (Var *) lfirst(lc);
4426 
4427  if (IsA(var, Var) &&
4428  var->varno == rtindex &&
4429  var->varattno == InvalidAttrNumber)
4430  {
4431  have_wholerow = true;
4432  break;
4433  }
4434  }
4435 
4436  if (have_wholerow)
4437  {
4438  TupleDesc tupdesc = RelationGetDescr(rel);
4439  int i;
4440 
4441  for (i = 1; i <= tupdesc->natts; i++)
4442  {
4443  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4444  Var *var;
4445 
4446  /* Ignore dropped attributes. */
4447  if (attr->attisdropped)
4448  continue;
4449 
4450  var = makeVar(rtindex,
4451  i,
4452  attr->atttypid,
4453  attr->atttypmod,
4454  attr->attcollation,
4455  0);
4456 
4457  tlist = lappend(tlist,
4458  makeTargetEntry((Expr *) var,
4459  list_length(tlist) + 1,
4460  NULL,
4461  false));
4462  }
4463  }
4464 
4465  /* Now add any remaining columns to tlist. */
4466  foreach(lc, vars)
4467  {
4468  Var *var = (Var *) lfirst(lc);
4469 
4470  /*
4471  * No need for whole-row references to the target relation. We don't
4472  * need system columns other than ctid and oid either, since those are
4473  * set locally.
4474  */
4475  if (IsA(var, Var) &&
4476  var->varno == rtindex &&
4477  var->varattno <= InvalidAttrNumber &&
4479  continue; /* don't need it */
4480 
4481  if (tlist_member((Expr *) var, tlist))
4482  continue; /* already got it */
4483 
4484  tlist = lappend(tlist,
4485  makeTargetEntry((Expr *) var,
4486  list_length(tlist) + 1,
4487  NULL,
4488  false));
4489  }
4490 
4491  list_free(vars);
4492 
4493  return tlist;
4494 }
4495 
4496 /*
4497  * rebuild_fdw_scan_tlist
4498  * Build new fdw_scan_tlist of given foreign-scan plan node from given
4499  * tlist
4500  *
4501  * There might be columns that the fdw_scan_tlist of the given foreign-scan
4502  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4503  * have contained resjunk columns such as 'ctid' of the target relation and
4504  * 'wholerow' of non-target relations, but the tlist might not contain them,
4505  * for example. So, adjust the tlist so it contains all the columns specified
4506  * in the fdw_scan_tlist; else setrefs.c will get confused.
4507  */
4508 static void
4510 {
4511  List *new_tlist = tlist;
4512  List *old_tlist = fscan->fdw_scan_tlist;
4513  ListCell *lc;
4514 
4515  foreach(lc, old_tlist)
4516  {
4517  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4518 
4519  if (tlist_member(tle->expr, new_tlist))
4520  continue; /* already got it */
4521 
4522  new_tlist = lappend(new_tlist,
4523  makeTargetEntry(tle->expr,
4524  list_length(new_tlist) + 1,
4525  NULL,
4526  false));
4527  }
4528  fscan->fdw_scan_tlist = new_tlist;
4529 }
4530 
4531 /*
4532  * Execute a direct UPDATE/DELETE statement.
4533  */
4534 static void
4536 {
4538  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4539  int numParams = dmstate->numParams;
4540  const char **values = dmstate->param_values;
4541 
4542  /* First, process a pending asynchronous request, if any. */
4543  if (dmstate->conn_state->pendingAreq)
4545 
4546  /*
4547  * Construct array of query parameter values in text format.
4548  */
4549  if (numParams > 0)
4550  process_query_params(econtext,
4551  dmstate->param_flinfo,
4552  dmstate->param_exprs,
4553  values);
4554 
4555  /*
4556  * Notice that we pass NULL for paramTypes, thus forcing the remote server
4557  * to infer types for all parameters. Since we explicitly cast every
4558  * parameter (see deparse.c), the "inference" is trivial and will produce
4559  * the desired result. This allows us to avoid assuming that the remote
4560  * server has the same OIDs we do for the parameters' types.
4561  */
4562  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4563  NULL, values, NULL, NULL, 0))
4564  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4565 
4566  /*
4567  * Get the result, and check for success.
4568  *
4569  * We don't use a PG_TRY block here, so be careful not to throw error
4570  * without releasing the PGresult.
4571  */
4572  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4573  if (PQresultStatus(dmstate->result) !=
4575  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4576  dmstate->query);
4577 
4578  /* Get the number of rows affected. */
4579  if (dmstate->has_returning)
4580  dmstate->num_tuples = PQntuples(dmstate->result);
4581  else
4582  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4583 }
4584 
4585 /*
4586  * Get the result of a RETURNING clause.
4587  */
4588 static TupleTableSlot *
4590 {
4592  EState *estate = node->ss.ps.state;
4593  ResultRelInfo *resultRelInfo = node->resultRelInfo;
4594  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4595  TupleTableSlot *resultSlot;
4596 
4597  Assert(resultRelInfo->ri_projectReturning);
4598 
4599  /* If we didn't get any tuples, must be end of data. */
4600  if (dmstate->next_tuple >= dmstate->num_tuples)
4601  return ExecClearTuple(slot);
4602 
4603  /* Increment the command es_processed count if necessary. */
4604  if (dmstate->set_processed)
4605  estate->es_processed += 1;
4606 
4607  /*
4608  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4609  * tuple. (has_returning is false when the local query is of the form
4610  * "UPDATE/DELETE .. RETURNING 1" for example.)
4611  */
4612  if (!dmstate->has_returning)
4613  {
4614  ExecStoreAllNullTuple(slot);
4615  resultSlot = slot;
4616  }
4617  else
4618  {
4619  /*
4620  * On error, be sure to release the PGresult on the way out. Callers
4621  * do not have PG_TRY blocks to ensure this happens.
4622  */
4623  PG_TRY();
4624  {
4625  HeapTuple newtup;
4626 
4627  newtup = make_tuple_from_result_row(dmstate->result,
4628  dmstate->next_tuple,
4629  dmstate->rel,
4630  dmstate->attinmeta,
4631  dmstate->retrieved_attrs,
4632  node,
4633  dmstate->temp_cxt);
4634  ExecStoreHeapTuple(newtup, slot, false);
4635  }
4636  PG_CATCH();
4637  {
4638  PQclear(dmstate->result);
4639  PG_RE_THROW();
4640  }
4641  PG_END_TRY();
4642 
4643  /* Get the updated/deleted tuple. */
4644  if (dmstate->rel)
4645  resultSlot = slot;
4646  else
4647  resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4648  }
4649  dmstate->next_tuple++;
4650 
4651  /* Make slot available for evaluation of the local query RETURNING list. */
4652  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4653  resultSlot;
4654 
4655  return slot;
4656 }
4657 
4658 /*
4659  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4660  */
4661 static void
4663  List *fdw_scan_tlist,
4664  Index rtindex)
4665 {
4666  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4667  ListCell *lc;
4668  int i;
4669 
4670  /*
4671  * Calculate the mapping between the fdw_scan_tlist's entries and the
4672  * result tuple's attributes.
4673  *
4674  * The "map" is an array of indexes of the result tuple's attributes in
4675  * fdw_scan_tlist, i.e., one entry for every attribute of the result
4676  * tuple. We store zero for any attributes that don't have the
4677  * corresponding entries in that list, marking that a NULL is needed in
4678  * the result tuple.
4679  *
4680  * Also get the indexes of the entries for ctid and oid if any.
4681  */
4682  dmstate->attnoMap = (AttrNumber *)
4683  palloc0(resultTupType->natts * sizeof(AttrNumber));
4684 
4685  dmstate->ctidAttno = dmstate->oidAttno = 0;
4686 
4687  i = 1;
4688  dmstate->hasSystemCols = false;
4689  foreach(lc, fdw_scan_tlist)
4690  {
4691  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4692  Var *var = (Var *) tle->expr;
4693 
4694  Assert(IsA(var, Var));
4695 
4696  /*
4697  * If the Var is a column of the target relation to be retrieved from
4698  * the foreign server, get the index of the entry.
4699  */
4700  if (var->varno == rtindex &&
4701  list_member_int(dmstate->retrieved_attrs, i))
4702  {
4703  int attrno = var->varattno;
4704 
4705  if (attrno < 0)
4706  {
4707  /*
4708  * We don't retrieve system columns other than ctid and oid.
4709  */
4710  if (attrno == SelfItemPointerAttributeNumber)
4711  dmstate->ctidAttno = i;
4712  else
4713  Assert(false);
4714  dmstate->hasSystemCols = true;
4715  }
4716  else
4717  {
4718  /*
4719  * We don't retrieve whole-row references to the target
4720  * relation either.
4721  */
4722  Assert(attrno > 0);
4723 
4724  dmstate->attnoMap[attrno - 1] = i;
4725  }
4726  }
4727  i++;
4728  }
4729 }
4730 
4731 /*
4732  * Extract and return an updated/deleted tuple from a scan tuple.
4733  */
4734 static TupleTableSlot *
4736  ResultRelInfo *resultRelInfo,
4737  TupleTableSlot *slot,
4738  EState *estate)
4739 {
4740  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4741  TupleTableSlot *resultSlot;
4742  Datum *values;
4743  bool *isnull;
4744  Datum *old_values;
4745  bool *old_isnull;
4746  int i;
4747 
4748  /*
4749  * Use the return tuple slot as a place to store the result tuple.
4750  */
4751  resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4752 
4753  /*
4754  * Extract all the values of the scan tuple.
4755  */
4756  slot_getallattrs(slot);
4757  old_values = slot->tts_values;
4758  old_isnull = slot->tts_isnull;
4759 
4760  /*
4761  * Prepare to build the result tuple.
4762  */
4763  ExecClearTuple(resultSlot);
4764  values = resultSlot->tts_values;
4765  isnull = resultSlot->tts_isnull;
4766 
4767  /*
4768  * Transpose data into proper fields of the result tuple.
4769  */
4770  for (i = 0; i < resultTupType->natts; i++)
4771  {
4772  int j = dmstate->attnoMap[i];
4773 
4774  if (j == 0)
4775  {
4776  values[i] = (Datum) 0;
4777  isnull[i] = true;
4778  }
4779  else
4780  {
4781  values[i] = old_values[j - 1];
4782  isnull[i] = old_isnull[j - 1];
4783  }
4784  }
4785 
4786  /*
4787  * Build the virtual tuple.
4788  */
4789  ExecStoreVirtualTuple(resultSlot);
4790 
4791  /*
4792  * If we have any system columns to return, materialize a heap tuple in
4793  * the slot from column values set above and install system columns in
4794  * that tuple.
4795  */
4796  if (dmstate->hasSystemCols)
4797  {
4798  HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4799 
4800  /* ctid */
4801  if (dmstate->ctidAttno)
4802  {
4803  ItemPointer ctid = NULL;
4804 
4805  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4806  resultTup->t_self = *ctid;
4807  }
4808 
4809  /*
4810  * And remaining columns
4811  *
4812  * Note: since we currently don't allow the target relation to appear
4813  * on the nullable side of an outer join, any system columns wouldn't
4814  * go to NULL.
4815  *
4816  * Note: no need to care about tableoid here because it will be
4817  * initialized in ExecProcessReturning().
4818  */
4822  }
4823 
4824  /*
4825  * And return the result tuple.
4826  */
4827  return resultSlot;
4828 }
4829 
4830 /*
4831  * Prepare for processing of parameters used in remote query.
4832  */
4833 static void
4835  List *fdw_exprs,
4836  int numParams,
4837  FmgrInfo **param_flinfo,
4838  List **param_exprs,
4839  const char ***param_values)
4840 {
4841  int i;
4842  ListCell *lc;
4843 
4844  Assert(numParams > 0);
4845 
4846  /* Prepare for output conversion of parameters used in remote query. */
4847  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4848 
4849  i = 0;
4850  foreach(lc, fdw_exprs)
4851  {
4852  Node *param_expr = (Node *) lfirst(lc);
4853  Oid typefnoid;
4854  bool isvarlena;
4855 
4856  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4857  fmgr_info(typefnoid, &(*param_flinfo)[i]);
4858  i++;
4859  }
4860 
4861  /*
4862  * Prepare remote-parameter expressions for evaluation. (Note: in
4863  * practice, we expect that all these expressions will be just Params, so
4864  * we could possibly do something more efficient than using the full
4865  * expression-eval machinery for this. But probably there would be little
4866  * benefit, and it'd require postgres_fdw to know more than is desirable
4867  * about Param evaluation.)
4868  */
4869  *param_exprs = ExecInitExprList(fdw_exprs, node);
4870 
4871  /* Allocate buffer for text form of query parameters. */
4872  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4873 }
4874 
4875 /*
4876  * Construct array of query parameter values in text format.
4877  */
4878 static void
4880  FmgrInfo *param_flinfo,
4881  List *param_exprs,
4882  const char **param_values)
4883 {
4884  int nestlevel;
4885  int i;
4886  ListCell *lc;
4887 
4888  nestlevel = set_transmission_modes();
4889 
4890  i = 0;
4891  foreach(lc, param_exprs)
4892  {
4893  ExprState *expr_state = (ExprState *) lfirst(lc);
4894  Datum expr_value;
4895  bool isNull;
4896 
4897  /* Evaluate the parameter expression */
4898  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4899 
4900  /*
4901  * Get string representation of each parameter value by invoking
4902  * type-specific output function, unless the value is null.
4903  */
4904  if (isNull)
4905  param_values[i] = NULL;
4906  else
4907  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4908 
4909  i++;
4910  }
4911 
4912  reset_transmission_modes(nestlevel);
4913 }
4914 
4915 /*
4916  * postgresAnalyzeForeignTable
4917  * Test whether analyzing this foreign table is supported
4918  */
4919 static bool
4921  AcquireSampleRowsFunc *func,
4922  BlockNumber *totalpages)
4923 {
4924  ForeignTable *table;
4925  UserMapping *user;
4926  PGconn *conn;
4927  StringInfoData sql;
4928  PGresult *volatile res = NULL;
4929 
4930  /* Return the row-analysis function pointer */
4932 
4933  /*
4934  * Now we have to get the number of pages. It's annoying that the ANALYZE
4935  * API requires us to return that now, because it forces some duplication
4936  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4937  * it's probably not worth redefining that API at this point.
4938  */
4939 
4940  /*
4941  * Get the connection to use. We do the remote access as the table's
4942  * owner, even if the ANALYZE was started by some other user.
4943  */
4944  table = GetForeignTable(RelationGetRelid(relation));
4945  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4946  conn = GetConnection(user, false, NULL);
4947 
4948  /*
4949  * Construct command to get page count for relation.
4950  */
4951  initStringInfo(&sql);
4952  deparseAnalyzeSizeSql(&sql, relation);
4953 
4954  /* In what follows, do not risk leaking any PGresults. */
4955  PG_TRY();
4956  {
4957  res = pgfdw_exec_query(conn, sql.data, NULL);
4959  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4960 
4961  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4962  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4963  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4964  }
4965  PG_FINALLY();
4966  {
4967  PQclear(res);
4968  }
4969  PG_END_TRY();
4970 
4972 
4973  return true;
4974 }
4975 
4976 /*
4977  * postgresGetAnalyzeInfoForForeignTable
4978  * Count tuples in foreign table (just get pg_class.reltuples).
4979  *
4980  * can_tablesample determines if the remote relation supports acquiring the
4981  * sample using TABLESAMPLE.
4982  */
4983 static double
4984 postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
4985 {
4986  ForeignTable *table;
4987  UserMapping *user;
4988  PGconn *conn;
4989  StringInfoData sql;
4990  PGresult *volatile res = NULL;
4991  volatile double reltuples = -1;
4992  volatile char relkind = 0;
4993 
4994  /* assume the remote relation does not support TABLESAMPLE */
4995  *can_tablesample = false;
4996 
4997  /*
4998  * Get the connection to use. We do the remote access as the table's
4999  * owner, even if the ANALYZE was started by some other user.
5000  */
5001  table = GetForeignTable(RelationGetRelid(relation));
5002  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5003  conn = GetConnection(user, false, NULL);
5004 
5005  /*
5006  * Construct command to get page count for relation.
5007  */
5008  initStringInfo(&sql);
5009  deparseAnalyzeInfoSql(&sql, relation);
5010 
5011  /* In what follows, do not risk leaking any PGresults. */
5012  PG_TRY();
5013  {
5014  res = pgfdw_exec_query(conn, sql.data, NULL);
5016  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5017 
5018  if (PQntuples(res) != 1 || PQnfields(res) != 2)
5019  elog(ERROR, "unexpected result from deparseAnalyzeTuplesSql query");
5020  reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
5021  relkind = *(PQgetvalue(res, 0, 1));
5022  }
5023  PG_FINALLY();
5024  {
5025  if (res)
5026  PQclear(res);
5027  }
5028  PG_END_TRY();
5029 
5031 
5032  /* TABLESAMPLE is supported only for regular tables and matviews */
5033  *can_tablesample = (relkind == RELKIND_RELATION ||
5034  relkind == RELKIND_MATVIEW ||
5035  relkind == RELKIND_PARTITIONED_TABLE);
5036 
5037  return reltuples;
5038 }
5039 
5040 /*
5041  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
5042  *
5043  * Selected rows are returned in the caller-allocated array rows[],
5044  * which must have at least targrows entries.
5045  * The actual number of rows selected is returned as the function result.
5046  * We also count the total number of rows in the table and return it into
5047  * *totalrows. Note that *totaldeadrows is always set to 0.
5048  *
5049  * Note that the returned list of rows is not always in order by physical
5050  * position in the table. Therefore, correlation estimates derived later
5051  * may be meaningless, but it's OK because we don't use the estimates
5052  * currently (the planner only pays attention to correlation for indexscans).
5053  */
5054 static int
5056  HeapTuple *rows, int targrows,
5057  double *totalrows,
5058  double *totaldeadrows)
5059 {
5060  PgFdwAnalyzeState astate;
5061  ForeignTable *table;
5062  ForeignServer *server;
5063  UserMapping *user;
5064  PGconn *conn;
5065  int server_version_num;
5066  PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
5067  double sample_frac = -1.0;
5068  double reltuples;
5069  unsigned int cursor_number;
5070  StringInfoData sql;
5071  PGresult *volatile res = NULL;
5072  ListCell *lc;
5073 
5074  /* Initialize workspace state */
5075  astate.rel = relation;
5077 
5078  astate.rows = rows;
5079  astate.targrows = targrows;
5080  astate.numrows = 0;
5081  astate.samplerows = 0;
5082  astate.rowstoskip = -1; /* -1 means not set yet */
5083  reservoir_init_selection_state(&astate.rstate, targrows);
5084 
5085  /* Remember ANALYZE context, and create a per-tuple temp context */
5086  astate.anl_cxt = CurrentMemoryContext;
5088  "postgres_fdw temporary data",
5090 
5091  /*
5092  * Get the connection to use. We do the remote access as the table's
5093  * owner, even if the ANALYZE was started by some other user.
5094  */
5095  table = GetForeignTable(RelationGetRelid(relation));
5096  server = GetForeignServer(table->serverid);
5097  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5098  conn = GetConnection(user, false, NULL);
5099 
5100  /* We'll need server version, so fetch it now. */
5102 
5103  /*
5104  * What sampling method should we use?
5105  */
5106  foreach(lc, server->options)
5107  {
5108  DefElem *def = (DefElem *) lfirst(lc);
5109 
5110  if (strcmp(def->defname, "analyze_sampling") == 0)
5111  {
5112  char *value = defGetString(def);
5113 
5114  if (strcmp(value, "off") == 0)
5115  method = ANALYZE_SAMPLE_OFF;
5116  else if (strcmp(value, "auto") == 0)
5117  method = ANALYZE_SAMPLE_AUTO;
5118  else if (strcmp(value, "random") == 0)
5119  method = ANALYZE_SAMPLE_RANDOM;
5120  else if (strcmp(value, "system") == 0)
5121  method = ANALYZE_SAMPLE_SYSTEM;
5122  else if (strcmp(value, "bernoulli") == 0)
5123  method = ANALYZE_SAMPLE_BERNOULLI;
5124 
5125  break;
5126  }
5127  }
5128 
5129  foreach(lc, table->options)
5130  {
5131  DefElem *def = (DefElem *) lfirst(lc);
5132 
5133  if (strcmp(def->defname, "analyze_sampling") == 0)
5134  {
5135  char *value = defGetString(def);
5136 
5137  if (strcmp(value, "off") == 0)
5138  method = ANALYZE_SAMPLE_OFF;
5139  else if (strcmp(value, "auto") == 0)
5140  method = ANALYZE_SAMPLE_AUTO;
5141  else if (strcmp(value, "random") == 0)
5142  method = ANALYZE_SAMPLE_RANDOM;
5143  else if (strcmp(value, "system") == 0)
5144  method = ANALYZE_SAMPLE_SYSTEM;
5145  else if (strcmp(value, "bernoulli") == 0)
5146  method = ANALYZE_SAMPLE_BERNOULLI;
5147 
5148  break;
5149  }
5150  }
5151 
5152  /*
5153  * Error-out if explicitly required one of the TABLESAMPLE methods, but
5154  * the server does not support it.
5155  */
5156  if ((server_version_num < 95000) &&
5157  (method == ANALYZE_SAMPLE_SYSTEM ||
5158  method == ANALYZE_SAMPLE_BERNOULLI))
5159  ereport(ERROR,
5160  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5161  errmsg("remote server does not support TABLESAMPLE feature")));
5162 
5163  /*
5164  * If we've decided to do remote sampling, calculate the sampling rate. We
5165  * need to get the number of tuples from the remote server, but skip that
5166  * network round-trip if not needed.
5167  */
5168  if (method != ANALYZE_SAMPLE_OFF)
5169  {
5170  bool can_tablesample;
5171 
5172  reltuples = postgresGetAnalyzeInfoForForeignTable(relation,
5173  &can_tablesample);
5174 
5175  /*
5176  * Make sure we're not choosing TABLESAMPLE when the remote relation does
5177  * not support that. But only do this for "auto" - if the user explicitly
5178  * requested BERNOULLI/SYSTEM, it's better to fail.
5179  */
5180  if (!can_tablesample && (method == ANALYZE_SAMPLE_AUTO))
5181  method = ANALYZE_SAMPLE_RANDOM;
5182 
5183  /*
5184  * Remote's reltuples could be 0 or -1 if the table has never been
5185  * vacuumed/analyzed. In that case, disable sampling after all.
5186  */
5187  if ((reltuples <= 0) || (targrows >= reltuples))
5188  method = ANALYZE_SAMPLE_OFF;
5189  else
5190  {
5191  /*
5192  * All supported sampling methods require sampling rate,
5193  * not target rows directly, so we calculate that using
5194  * the remote reltuples value. That's imperfect, because
5195  * it might be off a good deal, but that's not something
5196  * we can (or should) address here.
5197  *
5198  * If reltuples is too low (i.e. when table grew), we'll
5199  * end up sampling more rows - but then we'll apply the
5200  * local sampling, so we get the expected sample size.
5201  * This is the same outcome as without remote sampling.
5202  *
5203  * If reltuples is too high (e.g. after bulk DELETE), we
5204  * will end up sampling too few rows.
5205  *
5206  * We can't really do much better here - we could try
5207  * sampling a bit more rows, but we don't know how off
5208  * the reltuples value is so how much is "a bit more"?
5209  *
5210  * Furthermore, the targrows value for partitions is
5211  * determined based on table size (relpages), which can
5212  * be off in different ways too. Adjusting the sampling
5213  * rate here might make the issue worse.
5214  */
5215  sample_frac = targrows / reltuples;
5216 
5217  /*
5218  * We should never get sampling rate outside the valid range
5219  * (between 0.0 and 1.0), because those cases should be covered
5220  * by the previous branch that sets ANALYZE_SAMPLE_OFF.
5221  */
5222  Assert(sample_frac >= 0.0 && sample_frac <= 1.0);
5223  }
5224  }
5225 
5226  /*
5227  * For "auto" method, pick the one we believe is best. For servers with
5228  * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
5229  * random() to at least reduce network transfer.
5230  */
5231  if (method == ANALYZE_SAMPLE_AUTO)
5232  {
5233  if (server_version_num < 95000)
5234  method = ANALYZE_SAMPLE_RANDOM;
5235  else
5236  method = ANALYZE_SAMPLE_BERNOULLI;
5237  }
5238 
5239  /*
5240  * Construct cursor that retrieves whole rows from remote.
5241  */
5243  initStringInfo(&sql);
5244  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5245 
5246  deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
5247 
5248  /* In what follows, do not risk leaking any PGresults. */
5249  PG_TRY();
5250  {
5251  char fetch_sql[64];
5252  int fetch_size;
5253 
5254  res = pgfdw_exec_query(conn, sql.data, NULL);
5256  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5257  PQclear(res);
5258  res = NULL;
5259 
5260  /*
5261  * Determine the fetch size. The default is arbitrary, but shouldn't
5262  * be enormous.
5263  */
5264  fetch_size = 100;
5265  foreach(lc, server->options)
5266  {
5267  DefElem *def = (DefElem *) lfirst(lc);
5268 
5269  if (strcmp(def->defname, "fetch_size") == 0)
5270  {
5271  (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5272  break;
5273  }
5274  }
5275  foreach(lc, table->options)
5276  {
5277  DefElem *def = (DefElem *) lfirst(lc);
5278 
5279  if (strcmp(def->defname, "fetch_size") == 0)
5280  {
5281  (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5282  break;
5283  }
5284  }
5285 
5286  /* Construct command to fetch rows from remote. */
5287  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5289 
5290  /* Retrieve and process rows a batch at a time. */
5291  for (;;)
5292  {
5293  int numrows;
5294  int i;
5295 
5296  /* Allow users to cancel long query */
5298 
5299  /*
5300  * XXX possible future improvement: if rowstoskip is large, we
5301  * could issue a MOVE rather than physically fetching the rows,
5302  * then just adjust rowstoskip and samplerows appropriately.
5303  */
5304 
5305  /* Fetch some rows */
5306  res = pgfdw_exec_query(conn, fetch_sql, NULL);
5307  /* On error, report the original query, not the FETCH. */
5309  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5310 
5311  /* Process whatever we got. */
5312  numrows = PQntuples(res);
5313  for (i = 0; i < numrows; i++)
5314  analyze_row_processor(res, i, &astate);
5315 
5316  PQclear(res);
5317  res = NULL;
5318 
5319  /* Must be EOF if we didn't get all the rows requested. */
5320  if (numrows < fetch_size)
5321  break;
5322  }
5323 
5324  /* Close the cursor, just to be tidy. */
5326  }
5327  PG_CATCH();
5328  {
5329  PQclear(res);
5330  PG_RE_THROW();
5331  }
5332  PG_END_TRY();
5333 
5335 
5336  /* We assume that we have no dead tuple. */
5337  *totaldeadrows = 0.0;
5338 
5339  /*
5340  * Without sampling, we've retrieved all living tuples from foreign
5341  * server, so report that as totalrows. Otherwise use the reltuples
5342  * estimate we got from the remote side.
5343  */
5344  if (method == ANALYZE_SAMPLE_OFF)
5345  *totalrows = astate.samplerows;
5346  else
5347  *totalrows = reltuples;
5348 
5349  /*
5350  * Emit some interesting relation info
5351  */
5352  ereport(elevel,
5353  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5354  RelationGetRelationName(relation),
5355  *totalrows, astate.numrows)));
5356 
5357  return astate.numrows;
5358 }
5359 
5360 /*
5361  * Collect sample rows from the result of query.
5362  * - Use all tuples in sample until target # of samples are collected.
5363  * - Subsequently, replace already-sampled tuples randomly.
5364  */
5365 static void
5367 {
5368  int targrows = astate->targrows;
5369  int pos; /* array index to store tuple in */
5370  MemoryContext oldcontext;
5371 
5372  /* Always increment sample row counter. */
5373  astate->samplerows += 1;
5374 
5375  /*
5376  * Determine the slot where this sample row should be stored. Set pos to
5377  * negative value to indicate the row should be skipped.
5378  */
5379  if (astate->numrows < targrows)
5380  {
5381  /* First targrows rows are always included into the sample */
5382  pos = astate->numrows++;
5383  }
5384  else
5385  {
5386  /*
5387  * Now we start replacing tuples in the sample until we reach the end
5388  * of the relation. Same algorithm as in acquire_sample_rows in
5389  * analyze.c; see Jeff Vitter's paper.
5390  */
5391  if (astate->rowstoskip < 0)
5392  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5393 
5394  if (astate->rowstoskip <= 0)
5395  {
5396  /* Choose a random reservoir element to replace. */
5397  pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate));
5398  Assert(pos >= 0 && pos < targrows);
5399  heap_freetuple(astate->rows[pos]);
5400  }
5401  else
5402  {
5403  /* Skip this tuple. */
5404  pos = -1;
5405  }
5406 
5407  astate->rowstoskip -= 1;
5408  }
5409 
5410  if (pos >= 0)
5411  {
5412  /*
5413  * Create sample tuple from current result row, and store it in the
5414  * position determined above. The tuple has to be created in anl_cxt.
5415  */
5416  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5417 
5418  astate->rows[pos] = make_tuple_from_result_row(res, row,
5419  astate->rel,
5420  astate->attinmeta,
5421  astate->retrieved_attrs,
5422  NULL,
5423  astate->temp_cxt);
5424 
5425  MemoryContextSwitchTo(oldcontext);
5426  }
5427 }
5428 
5429 /*
5430  * Import a foreign schema
5431  */
5432 static List *
5434 {
5435  List *commands = NIL;
5436  bool import_collate = true;
5437  bool import_default = false;
5438  bool import_generated = true;
5439  bool import_not_null = true;
5440  ForeignServer *server;
5441  UserMapping *mapping;
5442  PGconn *conn;
5444  PGresult *volatile res = NULL;
5445  int numrows,
5446  i;
5447  ListCell *lc;
5448 
5449  /* Parse statement options */
5450  foreach(lc, stmt->options)
5451  {
5452  DefElem *def = (DefElem *) lfirst(lc);
5453 
5454  if (strcmp(def->defname, "import_collate") == 0)
5455  import_collate = defGetBoolean(def);
5456  else if (strcmp(def->defname, "import_default") == 0)
5457  import_default = defGetBoolean(def);
5458  else if (strcmp(def->defname, "import_generated") == 0)
5459  import_generated = defGetBoolean(def);
5460  else if (strcmp(def->defname, "import_not_null") == 0)
5461  import_not_null = defGetBoolean(def);
5462  else
5463  ereport(ERROR,
5464  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5465  errmsg("invalid option \"%s\"", def->defname)));
5466  }
5467 
5468  /*
5469  * Get connection to the foreign server. Connection manager will
5470  * establish new connection if necessary.
5471  */
5472  server = GetForeignServer(serverOid);
5473  mapping = GetUserMapping(GetUserId(), server->serverid);
5474  conn = GetConnection(mapping, false, NULL);
5475 
5476  /* Don't attempt to import collation if remote server hasn't got it */
5477  if (PQserverVersion(conn) < 90100)
5478  import_collate = false;
5479 
5480  /* Create workspace for strings */
5481  initStringInfo(&buf);
5482 
5483  /* In what follows, do not risk leaking any PGresults. */
5484  PG_TRY();
5485  {
5486  /* Check that the schema really exists */
5487  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5488  deparseStringLiteral(&buf, stmt->remote_schema);
5489 
5490  res = pgfdw_exec_query(conn, buf.data, NULL);
5492  pgfdw_report_error(ERROR, res, conn, false, buf.data);
5493 
5494  if (PQntuples(res) != 1)
5495  ereport(ERROR,
5496  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5497  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5498  stmt->remote_schema, server->servername)));
5499 
5500  PQclear(res);
5501  res = NULL;
5502  resetStringInfo(&buf);
5503 
5504  /*
5505  * Fetch all table data from this schema, possibly restricted by
5506  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5507  * to EXCEPT/LIMIT TO here, because the core code will filter the
5508  * statements we return according to those lists anyway. But it
5509  * should save a few cycles to not process excluded tables in the
5510  * first place.)
5511  *
5512  * Import table data for partitions only when they are explicitly
5513  * specified in LIMIT TO clause. Otherwise ignore them and only
5514  * include the definitions of the root partitioned tables to allow
5515  * access to the complete remote data set locally in the schema
5516  * imported.
5517  *
5518  * Note: because we run the connection with search_path restricted to
5519  * pg_catalog, the format_type() and pg_get_expr() outputs will always
5520  * include a schema name for types/functions in other schemas, which
5521  * is what we want.
5522  */
5524  "SELECT relname, "
5525  " attname, "
5526  " format_type(atttypid, atttypmod), "
5527  " attnotnull, "
5528  " pg_get_expr(adbin, adrelid), ");
5529 
5530  /* Generated columns are supported since Postgres 12 */
5531  if (PQserverVersion(conn) >= 120000)
5533  " attgenerated, ");
5534  else
5536  " NULL, ");
5537 
5538  if (import_collate)
5540  &