PostgreSQL Source Code  git master
postgres_fdw.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * postgres_fdw.c
4  * Foreign-data wrapper for remote PostgreSQL servers
5  *
6  * Portions Copyright (c) 2012-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * contrib/postgres_fdw/postgres_fdw.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include <limits.h>
16 
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_opfamily.h"
22 #include "commands/defrem.h"
23 #include "commands/explain.h"
24 #include "commands/vacuum.h"
25 #include "executor/execAsync.h"
26 #include "foreign/fdwapi.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "nodes/nodeFuncs.h"
31 #include "optimizer/appendinfo.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/cost.h"
34 #include "optimizer/optimizer.h"
35 #include "optimizer/pathnode.h"
36 #include "optimizer/paths.h"
37 #include "optimizer/planmain.h"
38 #include "optimizer/prep.h"
39 #include "optimizer/restrictinfo.h"
40 #include "optimizer/tlist.h"
41 #include "parser/parsetree.h"
42 #include "postgres_fdw.h"
43 #include "storage/latch.h"
44 #include "utils/builtins.h"
45 #include "utils/float.h"
46 #include "utils/guc.h"
47 #include "utils/lsyscache.h"
48 #include "utils/memutils.h"
49 #include "utils/rel.h"
50 #include "utils/sampling.h"
51 #include "utils/selfuncs.h"
52 
54 
55 /* Default CPU cost to start up a foreign query. */
56 #define DEFAULT_FDW_STARTUP_COST 100.0
57 
58 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
59 #define DEFAULT_FDW_TUPLE_COST 0.01
60 
61 /* If no remote estimates, assume a sort costs 20% extra */
62 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
63 
64 /*
65  * Indexes of FDW-private information stored in fdw_private lists.
66  *
67  * These items are indexed with the enum FdwScanPrivateIndex, so an item
68  * can be fetched with list_nth(). For example, to get the SELECT statement:
69  * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
70  */
72 {
73  /* SQL statement to execute remotely (as a String node) */
75  /* Integer list of attribute numbers retrieved by the SELECT */
77  /* Integer representing the desired fetch_size */
79 
80  /*
81  * String describing join i.e. names of relations being joined and types
82  * of join, added when the scan is join
83  */
85 };
86 
87 /*
88  * Similarly, this enum describes what's kept in the fdw_private list for
89  * a ModifyTable node referencing a postgres_fdw foreign table. We store:
90  *
91  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
92  * 2) Integer list of target attribute numbers for INSERT/UPDATE
93  * (NIL for a DELETE)
94  * 3) Length till the end of VALUES clause for INSERT
95  * (-1 for a DELETE/UPDATE)
96  * 4) Boolean flag showing if the remote query has a RETURNING clause
97  * 5) Integer list of attribute numbers retrieved by RETURNING, if any
98  */
100 {
101  /* SQL statement to execute remotely (as a String node) */
103  /* Integer list of target attribute numbers for INSERT/UPDATE */
105  /* Length till the end of VALUES clause (as an Integer node) */
107  /* has-returning flag (as a Boolean node) */
109  /* Integer list of attribute numbers retrieved by RETURNING */
111 };
112 
113 /*
114  * Similarly, this enum describes what's kept in the fdw_private list for
115  * a ForeignScan node that modifies a foreign table directly. We store:
116  *
117  * 1) UPDATE/DELETE statement text to be sent to the remote server
118  * 2) Boolean flag showing if the remote query has a RETURNING clause
119  * 3) Integer list of attribute numbers retrieved by RETURNING, if any
120  * 4) Boolean flag showing if we set the command es_processed
121  */
123 {
124  /* SQL statement to execute remotely (as a String node) */
126  /* has-returning flag (as a Boolean node) */
128  /* Integer list of attribute numbers retrieved by RETURNING */
130  /* set-processed flag (as a Boolean node) */
132 };
133 
134 /*
135  * Execution state of a foreign scan using postgres_fdw.
136  */
137 typedef struct PgFdwScanState
138 {
139  Relation rel; /* relcache entry for the foreign table. NULL
140  * for a foreign join scan. */
141  TupleDesc tupdesc; /* tuple descriptor of scan */
142  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
143 
144  /* extracted fdw_private data */
145  char *query; /* text of SELECT command */
146  List *retrieved_attrs; /* list of retrieved attribute numbers */
147 
148  /* for remote query execution */
149  PGconn *conn; /* connection for the scan */
150  PgFdwConnState *conn_state; /* extra per-connection state */
151  unsigned int cursor_number; /* quasi-unique ID for my cursor */
152  bool cursor_exists; /* have we created the cursor? */
153  int numParams; /* number of parameters passed to query */
154  FmgrInfo *param_flinfo; /* output conversion functions for them */
155  List *param_exprs; /* executable expressions for param values */
156  const char **param_values; /* textual values of query parameters */
157 
158  /* for storing result tuples */
159  HeapTuple *tuples; /* array of currently-retrieved tuples */
160  int num_tuples; /* # of tuples in array */
161  int next_tuple; /* index of next one to return */
162 
163  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
164  int fetch_ct_2; /* Min(# of fetches done, 2) */
165  bool eof_reached; /* true if last fetch reached EOF */
166 
167  /* for asynchronous execution */
168  bool async_capable; /* engage asynchronous-capable logic? */
169 
170  /* working memory contexts */
171  MemoryContext batch_cxt; /* context holding current batch of tuples */
172  MemoryContext temp_cxt; /* context for per-tuple temporary data */
173 
174  int fetch_size; /* number of tuples per fetch */
176 
177 /*
178  * Execution state of a foreign insert/update/delete operation.
179  */
180 typedef struct PgFdwModifyState
181 {
182  Relation rel; /* relcache entry for the foreign table */
183  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
184 
185  /* for remote query execution */
186  PGconn *conn; /* connection for the scan */
187  PgFdwConnState *conn_state; /* extra per-connection state */
188  char *p_name; /* name of prepared statement, if created */
189 
190  /* extracted fdw_private data */
191  char *query; /* text of INSERT/UPDATE/DELETE command */
192  char *orig_query; /* original text of INSERT command */
193  List *target_attrs; /* list of target attribute numbers */
194  int values_end; /* length up to the end of VALUES */
195  int batch_size; /* value of FDW option "batch_size" */
196  bool has_returning; /* is there a RETURNING clause? */
197  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
198 
199  /* info about parameters for prepared statement */
200  AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
201  int p_nums; /* number of parameters to transmit */
202  FmgrInfo *p_flinfo; /* output conversion functions for them */
203 
204  /* batch operation stuff */
205  int num_slots; /* number of slots to insert */
206 
207  /* working memory context */
208  MemoryContext temp_cxt; /* context for per-tuple temporary data */
209 
210  /* for update row movement if subplan result rel */
211  struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
212  * created */
214 
215 /*
216  * Execution state of a foreign scan that modifies a foreign table directly.
217  */
219 {
220  Relation rel; /* relcache entry for the foreign table */
221  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
222 
223  /* extracted fdw_private data */
224  char *query; /* text of UPDATE/DELETE command */
225  bool has_returning; /* is there a RETURNING clause? */
226  List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
227  bool set_processed; /* do we set the command es_processed? */
228 
229  /* for remote query execution */
230  PGconn *conn; /* connection for the update */
231  PgFdwConnState *conn_state; /* extra per-connection state */
232  int numParams; /* number of parameters passed to query */
233  FmgrInfo *param_flinfo; /* output conversion functions for them */
234  List *param_exprs; /* executable expressions for param values */
235  const char **param_values; /* textual values of query parameters */
236 
237  /* for storing result tuples */
238  PGresult *result; /* result for query */
239  int num_tuples; /* # of result tuples */
240  int next_tuple; /* index of next one to return */
241  Relation resultRel; /* relcache entry for the target relation */
242  AttrNumber *attnoMap; /* array of attnums of input user columns */
243  AttrNumber ctidAttno; /* attnum of input ctid column */
244  AttrNumber oidAttno; /* attnum of input oid column */
245  bool hasSystemCols; /* are there system columns of resultRel? */
246 
247  /* working memory context */
248  MemoryContext temp_cxt; /* context for per-tuple temporary data */
250 
251 /*
252  * Workspace for analyzing a foreign table.
253  */
254 typedef struct PgFdwAnalyzeState
255 {
256  Relation rel; /* relcache entry for the foreign table */
257  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
258  List *retrieved_attrs; /* attr numbers retrieved by query */
259 
260  /* collected sample rows */
261  HeapTuple *rows; /* array of size targrows */
262  int targrows; /* target # of sample rows */
263  int numrows; /* # of sample rows collected */
264 
265  /* for random sampling */
266  double samplerows; /* # of rows fetched */
267  double rowstoskip; /* # of rows to skip before next sample */
268  ReservoirStateData rstate; /* state for reservoir sampling */
269 
270  /* working memory contexts */
271  MemoryContext anl_cxt; /* context for per-analyze lifespan data */
272  MemoryContext temp_cxt; /* context for per-tuple temporary data */
274 
275 /*
276  * This enum describes what's kept in the fdw_private list for a ForeignPath.
277  * We store:
278  *
279  * 1) Boolean flag showing if the remote query has the final sort
280  * 2) Boolean flag showing if the remote query has the LIMIT clause
281  */
283 {
284  /* has-final-sort flag (as a Boolean node) */
286  /* has-limit flag (as a Boolean node) */
288 };
289 
290 /* Struct for extra information passed to estimate_path_cost_size() */
291 typedef struct
292 {
295  bool has_limit;
296  double limit_tuples;
297  int64 count_est;
298  int64 offset_est;
300 
301 /*
302  * Identify the attribute where data conversion fails.
303  */
304 typedef struct ConversionLocation
305 {
306  AttrNumber cur_attno; /* attribute number being processed, or 0 */
307  Relation rel; /* foreign table being processed, or NULL */
308  ForeignScanState *fsstate; /* plan node being processed, or NULL */
310 
311 /* Callback argument for ec_member_matches_foreign */
312 typedef struct
313 {
314  Expr *current; /* current expr, or NULL if not yet found */
315  List *already_used; /* expressions already dealt with */
317 
318 /*
319  * SQL functions
320  */
322 
323 /*
324  * FDW callback routines
325  */
326 static void postgresGetForeignRelSize(PlannerInfo *root,
327  RelOptInfo *baserel,
328  Oid foreigntableid);
329 static void postgresGetForeignPaths(PlannerInfo *root,
330  RelOptInfo *baserel,
331  Oid foreigntableid);
333  RelOptInfo *foreignrel,
334  Oid foreigntableid,
335  ForeignPath *best_path,
336  List *tlist,
337  List *scan_clauses,
338  Plan *outer_plan);
339 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
342 static void postgresEndForeignScan(ForeignScanState *node);
344  Index rtindex,
345  RangeTblEntry *target_rte,
346  Relation target_relation);
348  ModifyTable *plan,
349  Index resultRelation,
350  int subplan_index);
351 static void postgresBeginForeignModify(ModifyTableState *mtstate,
352  ResultRelInfo *resultRelInfo,
353  List *fdw_private,
354  int subplan_index,
355  int eflags);
357  ResultRelInfo *resultRelInfo,
358  TupleTableSlot *slot,
359  TupleTableSlot *planSlot);
361  ResultRelInfo *resultRelInfo,
362  TupleTableSlot **slots,
363  TupleTableSlot **planSlots,
364  int *numSlots);
365 static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
367  ResultRelInfo *resultRelInfo,
368  TupleTableSlot *slot,
369  TupleTableSlot *planSlot);
371  ResultRelInfo *resultRelInfo,
372  TupleTableSlot *slot,
373  TupleTableSlot *planSlot);
374 static void postgresEndForeignModify(EState *estate,
375  ResultRelInfo *resultRelInfo);
376 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
377  ResultRelInfo *resultRelInfo);
378 static void postgresEndForeignInsert(EState *estate,
379  ResultRelInfo *resultRelInfo);
381 static bool postgresPlanDirectModify(PlannerInfo *root,
382  ModifyTable *plan,
383  Index resultRelation,
384  int subplan_index);
385 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
387 static void postgresEndDirectModify(ForeignScanState *node);
389  ExplainState *es);
391  ResultRelInfo *rinfo,
392  List *fdw_private,
393  int subplan_index,
394  ExplainState *es);
396  ExplainState *es);
397 static void postgresExecForeignTruncate(List *rels,
398  DropBehavior behavior,
399  bool restart_seqs);
400 static bool postgresAnalyzeForeignTable(Relation relation,
401  AcquireSampleRowsFunc *func,
402  BlockNumber *totalpages);
404  Oid serverOid);
405 static void postgresGetForeignJoinPaths(PlannerInfo *root,
406  RelOptInfo *joinrel,
407  RelOptInfo *outerrel,
408  RelOptInfo *innerrel,
409  JoinType jointype,
410  JoinPathExtraData *extra);
412  TupleTableSlot *slot);
413 static void postgresGetForeignUpperPaths(PlannerInfo *root,
414  UpperRelationKind stage,
415  RelOptInfo *input_rel,
416  RelOptInfo *output_rel,
417  void *extra);
419 static void postgresForeignAsyncRequest(AsyncRequest *areq);
421 static void postgresForeignAsyncNotify(AsyncRequest *areq);
422 
423 /*
424  * Helper functions
425  */
426 static void estimate_path_cost_size(PlannerInfo *root,
427  RelOptInfo *foreignrel,
428  List *param_join_conds,
429  List *pathkeys,
430  PgFdwPathExtraData *fpextra,
431  double *p_rows, int *p_width,
432  Cost *p_startup_cost, Cost *p_total_cost);
433 static void get_remote_estimate(const char *sql,
434  PGconn *conn,
435  double *rows,
436  int *width,
437  Cost *startup_cost,
438  Cost *total_cost);
440  List *pathkeys,
441  double retrieved_rows,
442  double width,
443  double limit_tuples,
444  Cost *p_startup_cost,
445  Cost *p_run_cost);
446 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
448  void *arg);
449 static void create_cursor(ForeignScanState *node);
450 static void fetch_more_data(ForeignScanState *node);
451 static void close_cursor(PGconn *conn, unsigned int cursor_number,
452  PgFdwConnState *conn_state);
454  RangeTblEntry *rte,
455  ResultRelInfo *resultRelInfo,
456  CmdType operation,
457  Plan *subplan,
458  char *query,
459  List *target_attrs,
460  int values_end,
461  bool has_returning,
462  List *retrieved_attrs);
464  ResultRelInfo *resultRelInfo,
465  CmdType operation,
466  TupleTableSlot **slots,
467  TupleTableSlot **planSlots,
468  int *numSlots);
469 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
470 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
471  ItemPointer tupleid,
472  TupleTableSlot **slots,
473  int numSlots);
474 static void store_returning_result(PgFdwModifyState *fmstate,
475  TupleTableSlot *slot, PGresult *res);
476 static void finish_foreign_modify(PgFdwModifyState *fmstate);
477 static void deallocate_query(PgFdwModifyState *fmstate);
478 static List *build_remote_returning(Index rtindex, Relation rel,
479  List *returningList);
480 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
481 static void execute_dml_stmt(ForeignScanState *node);
483 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
484  List *fdw_scan_tlist,
485  Index rtindex);
487  ResultRelInfo *resultRelInfo,
488  TupleTableSlot *slot,
489  EState *estate);
490 static void prepare_query_params(PlanState *node,
491  List *fdw_exprs,
492  int numParams,
493  FmgrInfo **param_flinfo,
494  List **param_exprs,
495  const char ***param_values);
496 static void process_query_params(ExprContext *econtext,
497  FmgrInfo *param_flinfo,
498  List *param_exprs,
499  const char **param_values);
500 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
501  HeapTuple *rows, int targrows,
502  double *totalrows,
503  double *totaldeadrows);
504 static void analyze_row_processor(PGresult *res, int row,
505  PgFdwAnalyzeState *astate);
506 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
507 static void fetch_more_data_begin(AsyncRequest *areq);
508 static void complete_pending_request(AsyncRequest *areq);
510  int row,
511  Relation rel,
512  AttInMetadata *attinmeta,
513  List *retrieved_attrs,
514  ForeignScanState *fsstate,
515  MemoryContext temp_context);
516 static void conversion_error_callback(void *arg);
517 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
518  JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
519  JoinPathExtraData *extra);
520 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
521  Node *havingQual);
523  RelOptInfo *rel);
526  Path *epq_path);
527 static void add_foreign_grouping_paths(PlannerInfo *root,
528  RelOptInfo *input_rel,
529  RelOptInfo *grouped_rel,
530  GroupPathExtraData *extra);
531 static void add_foreign_ordered_paths(PlannerInfo *root,
532  RelOptInfo *input_rel,
533  RelOptInfo *ordered_rel);
534 static void add_foreign_final_paths(PlannerInfo *root,
535  RelOptInfo *input_rel,
536  RelOptInfo *final_rel,
537  FinalPathExtraData *extra);
538 static void apply_server_options(PgFdwRelationInfo *fpinfo);
539 static void apply_table_options(PgFdwRelationInfo *fpinfo);
540 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
541  const PgFdwRelationInfo *fpinfo_o,
542  const PgFdwRelationInfo *fpinfo_i);
543 static int get_batch_size_option(Relation rel);
544 
545 
546 /*
547  * Foreign-data wrapper handler function: return a struct with pointers
548  * to my callback routines.
549  */
550 Datum
552 {
553  FdwRoutine *routine = makeNode(FdwRoutine);
554 
555  /* Functions for scanning foreign tables */
563 
564  /* Functions for updating foreign tables */
581 
582  /* Function for EvalPlanQual rechecks */
584  /* Support functions for EXPLAIN */
588 
589  /* Support function for TRUNCATE */
591 
592  /* Support functions for ANALYZE */
594 
595  /* Support functions for IMPORT FOREIGN SCHEMA */
597 
598  /* Support functions for join push-down */
600 
601  /* Support functions for upper relation push-down */
603 
604  /* Support functions for asynchronous execution */
609 
610  PG_RETURN_POINTER(routine);
611 }
612 
613 /*
614  * postgresGetForeignRelSize
615  * Estimate # of rows and width of the result of the scan
616  *
617  * We should consider the effect of all baserestrictinfo clauses here, but
618  * not any join clauses.
619  */
620 static void
622  RelOptInfo *baserel,
623  Oid foreigntableid)
624 {
625  PgFdwRelationInfo *fpinfo;
626  ListCell *lc;
627 
628  /*
629  * We use PgFdwRelationInfo to pass various information to subsequent
630  * functions.
631  */
632  fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
633  baserel->fdw_private = (void *) fpinfo;
634 
635  /* Base foreign tables need to be pushed down always. */
636  fpinfo->pushdown_safe = true;
637 
638  /* Look up foreign-table catalog info. */
639  fpinfo->table = GetForeignTable(foreigntableid);
640  fpinfo->server = GetForeignServer(fpinfo->table->serverid);
641 
642  /*
643  * Extract user-settable option values. Note that per-table settings of
644  * use_remote_estimate, fetch_size and async_capable override per-server
645  * settings of them, respectively.
646  */
647  fpinfo->use_remote_estimate = false;
650  fpinfo->shippable_extensions = NIL;
651  fpinfo->fetch_size = 100;
652  fpinfo->async_capable = false;
653 
654  apply_server_options(fpinfo);
655  apply_table_options(fpinfo);
656 
657  /*
658  * If the table or the server is configured to use remote estimates,
659  * identify which user to do remote access as during planning. This
660  * should match what ExecCheckRTEPerms() does. If we fail due to lack of
661  * permissions, the query would have failed at runtime anyway.
662  */
663  if (fpinfo->use_remote_estimate)
664  {
665  Oid userid;
666 
667  userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
668  fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
669  }
670  else
671  fpinfo->user = NULL;
672 
673  /*
674  * Identify which baserestrictinfo clauses can be sent to the remote
675  * server and which can't.
676  */
677  classifyConditions(root, baserel, baserel->baserestrictinfo,
678  &fpinfo->remote_conds, &fpinfo->local_conds);
679 
680  /*
681  * Identify which attributes will need to be retrieved from the remote
682  * server. These include all attrs needed for joins or final output, plus
683  * all attrs used in the local_conds. (Note: if we end up using a
684  * parameterized scan, it's possible that some of the join clauses will be
685  * sent to the remote and thus we wouldn't really need to retrieve the
686  * columns used in them. Doesn't seem worth detecting that case though.)
687  */
688  fpinfo->attrs_used = NULL;
689  pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
690  &fpinfo->attrs_used);
691  foreach(lc, fpinfo->local_conds)
692  {
693  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
694 
695  pull_varattnos((Node *) rinfo->clause, baserel->relid,
696  &fpinfo->attrs_used);
697  }
698 
699  /*
700  * Compute the selectivity and cost of the local_conds, so we don't have
701  * to do it over again for each path. The best we can do for these
702  * conditions is to estimate selectivity on the basis of local statistics.
703  */
705  fpinfo->local_conds,
706  baserel->relid,
707  JOIN_INNER,
708  NULL);
709 
710  cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
711 
712  /*
713  * Set # of retrieved rows and cached relation costs to some negative
714  * value, so that we can detect when they are set to some sensible values,
715  * during one (usually the first) of the calls to estimate_path_cost_size.
716  */
717  fpinfo->retrieved_rows = -1;
718  fpinfo->rel_startup_cost = -1;
719  fpinfo->rel_total_cost = -1;
720 
721  /*
722  * If the table or the server is configured to use remote estimates,
723  * connect to the foreign server and execute EXPLAIN to estimate the
724  * number of rows selected by the restriction clauses, as well as the
725  * average row width. Otherwise, estimate using whatever statistics we
726  * have locally, in a way similar to ordinary tables.
727  */
728  if (fpinfo->use_remote_estimate)
729  {
730  /*
731  * Get cost/size estimates with help of remote server. Save the
732  * values in fpinfo so we don't need to do it again to generate the
733  * basic foreign path.
734  */
735  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
736  &fpinfo->rows, &fpinfo->width,
737  &fpinfo->startup_cost, &fpinfo->total_cost);
738 
739  /* Report estimated baserel size to planner. */
740  baserel->rows = fpinfo->rows;
741  baserel->reltarget->width = fpinfo->width;
742  }
743  else
744  {
745  /*
746  * If the foreign table has never been ANALYZEd, it will have
747  * reltuples < 0, meaning "unknown". We can't do much if we're not
748  * allowed to consult the remote server, but we can use a hack similar
749  * to plancat.c's treatment of empty relations: use a minimum size
750  * estimate of 10 pages, and divide by the column-datatype-based width
751  * estimate to get the corresponding number of tuples.
752  */
753  if (baserel->tuples < 0)
754  {
755  baserel->pages = 10;
756  baserel->tuples =
757  (10 * BLCKSZ) / (baserel->reltarget->width +
759  }
760 
761  /* Estimate baserel size as best we can with local statistics. */
762  set_baserel_size_estimates(root, baserel);
763 
764  /* Fill in basically-bogus cost estimates for use later. */
765  estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
766  &fpinfo->rows, &fpinfo->width,
767  &fpinfo->startup_cost, &fpinfo->total_cost);
768  }
769 
770  /*
771  * fpinfo->relation_name gets the numeric rangetable index of the foreign
772  * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
773  * human-readable string at that time.)
774  */
775  fpinfo->relation_name = psprintf("%u", baserel->relid);
776 
777  /* No outer and inner relations. */
778  fpinfo->make_outerrel_subquery = false;
779  fpinfo->make_innerrel_subquery = false;
780  fpinfo->lower_subquery_rels = NULL;
781  /* Set the relation index. */
782  fpinfo->relation_index = baserel->relid;
783 }
784 
785 /*
786  * get_useful_ecs_for_relation
787  * Determine which EquivalenceClasses might be involved in useful
788  * orderings of this relation.
789  *
790  * This function is in some respects a mirror image of the core function
791  * pathkeys_useful_for_merging: for a regular table, we know what indexes
792  * we have and want to test whether any of them are useful. For a foreign
793  * table, we don't know what indexes are present on the remote side but
794  * want to speculate about which ones we'd like to use if they existed.
795  *
796  * This function returns a list of potentially-useful equivalence classes,
797  * but it does not guarantee that an EquivalenceMember exists which contains
798  * Vars only from the given relation. For example, given ft1 JOIN t1 ON
799  * ft1.x + t1.x = 0, this function will say that the equivalence class
800  * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
801  * t1 is local (or on a different server), it will turn out that no useful
802  * ORDER BY clause can be generated. It's not our job to figure that out
803  * here; we're only interested in identifying relevant ECs.
804  */
805 static List *
807 {
808  List *useful_eclass_list = NIL;
809  ListCell *lc;
810  Relids relids;
811 
812  /*
813  * First, consider whether any active EC is potentially useful for a merge
814  * join against this relation.
815  */
816  if (rel->has_eclass_joins)
817  {
818  foreach(lc, root->eq_classes)
819  {
820  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
821 
822  if (eclass_useful_for_merging(root, cur_ec, rel))
823  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
824  }
825  }
826 
827  /*
828  * Next, consider whether there are any non-EC derivable join clauses that
829  * are merge-joinable. If the joininfo list is empty, we can exit
830  * quickly.
831  */
832  if (rel->joininfo == NIL)
833  return useful_eclass_list;
834 
835  /* If this is a child rel, we must use the topmost parent rel to search. */
836  if (IS_OTHER_REL(rel))
837  {
839  relids = rel->top_parent_relids;
840  }
841  else
842  relids = rel->relids;
843 
844  /* Check each join clause in turn. */
845  foreach(lc, rel->joininfo)
846  {
847  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
848 
849  /* Consider only mergejoinable clauses */
850  if (restrictinfo->mergeopfamilies == NIL)
851  continue;
852 
853  /* Make sure we've got canonical ECs. */
854  update_mergeclause_eclasses(root, restrictinfo);
855 
856  /*
857  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
858  * that left_ec and right_ec will be initialized, per comments in
859  * distribute_qual_to_rels.
860  *
861  * We want to identify which side of this merge-joinable clause
862  * contains columns from the relation produced by this RelOptInfo. We
863  * test for overlap, not containment, because there could be extra
864  * relations on either side. For example, suppose we've got something
865  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
866  * A.y = D.y. The input rel might be the joinrel between A and B, and
867  * we'll consider the join clause A.y = D.y. relids contains a
868  * relation not involved in the join class (B) and the equivalence
869  * class for the left-hand side of the clause contains a relation not
870  * involved in the input rel (C). Despite the fact that we have only
871  * overlap and not containment in either direction, A.y is potentially
872  * useful as a sort column.
873  *
874  * Note that it's even possible that relids overlaps neither side of
875  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
876  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
877  * but overlaps neither side of B. In that case, we just skip this
878  * join clause, since it doesn't suggest a useful sort order for this
879  * relation.
880  */
881  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
882  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
883  restrictinfo->right_ec);
884  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
885  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
886  restrictinfo->left_ec);
887  }
888 
889  return useful_eclass_list;
890 }
891 
892 /*
893  * get_useful_pathkeys_for_relation
894  * Determine which orderings of a relation might be useful.
895  *
896  * Getting data in sorted order can be useful either because the requested
897  * order matches the final output ordering for the overall query we're
898  * planning, or because it enables an efficient merge join. Here, we try
899  * to figure out which pathkeys to consider.
900  */
901 static List *
903 {
904  List *useful_pathkeys_list = NIL;
905  List *useful_eclass_list;
906  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
907  EquivalenceClass *query_ec = NULL;
908  ListCell *lc;
909 
910  /*
911  * Pushing the query_pathkeys to the remote server is always worth
912  * considering, because it might let us avoid a local sort.
913  */
914  fpinfo->qp_is_pushdown_safe = false;
915  if (root->query_pathkeys)
916  {
917  bool query_pathkeys_ok = true;
918 
919  foreach(lc, root->query_pathkeys)
920  {
921  PathKey *pathkey = (PathKey *) lfirst(lc);
922 
923  /*
924  * The planner and executor don't have any clever strategy for
925  * taking data sorted by a prefix of the query's pathkeys and
926  * getting it to be sorted by all of those pathkeys. We'll just
927  * end up resorting the entire data set. So, unless we can push
928  * down all of the query pathkeys, forget it.
929  */
930  if (!is_foreign_pathkey(root, rel, pathkey))
931  {
932  query_pathkeys_ok = false;
933  break;
934  }
935  }
936 
937  if (query_pathkeys_ok)
938  {
939  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
940  fpinfo->qp_is_pushdown_safe = true;
941  }
942  }
943 
944  /*
945  * Even if we're not using remote estimates, having the remote side do the
946  * sort generally won't be any worse than doing it locally, and it might
947  * be much better if the remote side can generate data in the right order
948  * without needing a sort at all. However, what we're going to do next is
949  * try to generate pathkeys that seem promising for possible merge joins,
950  * and that's more speculative. A wrong choice might hurt quite a bit, so
951  * bail out if we can't use remote estimates.
952  */
953  if (!fpinfo->use_remote_estimate)
954  return useful_pathkeys_list;
955 
956  /* Get the list of interesting EquivalenceClasses. */
957  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
958 
959  /* Extract unique EC for query, if any, so we don't consider it again. */
960  if (list_length(root->query_pathkeys) == 1)
961  {
962  PathKey *query_pathkey = linitial(root->query_pathkeys);
963 
964  query_ec = query_pathkey->pk_eclass;
965  }
966 
967  /*
968  * As a heuristic, the only pathkeys we consider here are those of length
969  * one. It's surely possible to consider more, but since each one we
970  * choose to consider will generate a round-trip to the remote side, we
971  * need to be a bit cautious here. It would sure be nice to have a local
972  * cache of information about remote index definitions...
973  */
974  foreach(lc, useful_eclass_list)
975  {
976  EquivalenceClass *cur_ec = lfirst(lc);
977  PathKey *pathkey;
978 
979  /* If redundant with what we did above, skip it. */
980  if (cur_ec == query_ec)
981  continue;
982 
983  /* Can't push down the sort if the EC's opfamily is not shippable. */
985  OperatorFamilyRelationId, fpinfo))
986  continue;
987 
988  /* If no pushable expression for this rel, skip it. */
989  if (find_em_for_rel(root, cur_ec, rel) == NULL)
990  continue;
991 
992  /* Looks like we can generate a pathkey, so let's do it. */
993  pathkey = make_canonical_pathkey(root, cur_ec,
994  linitial_oid(cur_ec->ec_opfamilies),
996  false);
997  useful_pathkeys_list = lappend(useful_pathkeys_list,
998  list_make1(pathkey));
999  }
1000 
1001  return useful_pathkeys_list;
1002 }
1003 
1004 /*
1005  * postgresGetForeignPaths
1006  * Create possible scan paths for a scan on the foreign table
1007  */
1008 static void
1010  RelOptInfo *baserel,
1011  Oid foreigntableid)
1012 {
1013  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1014  ForeignPath *path;
1015  List *ppi_list;
1016  ListCell *lc;
1017 
1018  /*
1019  * Create simplest ForeignScan path node and add it to baserel. This path
1020  * corresponds to SeqScan path of regular tables (though depending on what
1021  * baserestrict conditions we were able to send to remote, there might
1022  * actually be an indexscan happening there). We already did all the work
1023  * to estimate cost and size of this path.
1024  *
1025  * Although this path uses no join clauses, it could still have required
1026  * parameterization due to LATERAL refs in its tlist.
1027  */
1028  path = create_foreignscan_path(root, baserel,
1029  NULL, /* default pathtarget */
1030  fpinfo->rows,
1031  fpinfo->startup_cost,
1032  fpinfo->total_cost,
1033  NIL, /* no pathkeys */
1034  baserel->lateral_relids,
1035  NULL, /* no extra plan */
1036  NIL); /* no fdw_private list */
1037  add_path(baserel, (Path *) path);
1038 
1039  /* Add paths with pathkeys */
1040  add_paths_with_pathkeys_for_rel(root, baserel, NULL);
1041 
1042  /*
1043  * If we're not using remote estimates, stop here. We have no way to
1044  * estimate whether any join clauses would be worth sending across, so
1045  * don't bother building parameterized paths.
1046  */
1047  if (!fpinfo->use_remote_estimate)
1048  return;
1049 
1050  /*
1051  * Thumb through all join clauses for the rel to identify which outer
1052  * relations could supply one or more safe-to-send-to-remote join clauses.
1053  * We'll build a parameterized path for each such outer relation.
1054  *
1055  * It's convenient to manage this by representing each candidate outer
1056  * relation by the ParamPathInfo node for it. We can then use the
1057  * ppi_clauses list in the ParamPathInfo node directly as a list of the
1058  * interesting join clauses for that rel. This takes care of the
1059  * possibility that there are multiple safe join clauses for such a rel,
1060  * and also ensures that we account for unsafe join clauses that we'll
1061  * still have to enforce locally (since the parameterized-path machinery
1062  * insists that we handle all movable clauses).
1063  */
1064  ppi_list = NIL;
1065  foreach(lc, baserel->joininfo)
1066  {
1067  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1068  Relids required_outer;
1069  ParamPathInfo *param_info;
1070 
1071  /* Check if clause can be moved to this rel */
1072  if (!join_clause_is_movable_to(rinfo, baserel))
1073  continue;
1074 
1075  /* See if it is safe to send to remote */
1076  if (!is_foreign_expr(root, baserel, rinfo->clause))
1077  continue;
1078 
1079  /* Calculate required outer rels for the resulting path */
1080  required_outer = bms_union(rinfo->clause_relids,
1081  baserel->lateral_relids);
1082  /* We do not want the foreign rel itself listed in required_outer */
1083  required_outer = bms_del_member(required_outer, baserel->relid);
1084 
1085  /*
1086  * required_outer probably can't be empty here, but if it were, we
1087  * couldn't make a parameterized path.
1088  */
1089  if (bms_is_empty(required_outer))
1090  continue;
1091 
1092  /* Get the ParamPathInfo */
1093  param_info = get_baserel_parampathinfo(root, baserel,
1094  required_outer);
1095  Assert(param_info != NULL);
1096 
1097  /*
1098  * Add it to list unless we already have it. Testing pointer equality
1099  * is OK since get_baserel_parampathinfo won't make duplicates.
1100  */
1101  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1102  }
1103 
1104  /*
1105  * The above scan examined only "generic" join clauses, not those that
1106  * were absorbed into EquivalenceClauses. See if we can make anything out
1107  * of EquivalenceClauses.
1108  */
1109  if (baserel->has_eclass_joins)
1110  {
1111  /*
1112  * We repeatedly scan the eclass list looking for column references
1113  * (or expressions) belonging to the foreign rel. Each time we find
1114  * one, we generate a list of equivalence joinclauses for it, and then
1115  * see if any are safe to send to the remote. Repeat till there are
1116  * no more candidate EC members.
1117  */
1119 
1120  arg.already_used = NIL;
1121  for (;;)
1122  {
1123  List *clauses;
1124 
1125  /* Make clauses, skipping any that join to lateral_referencers */
1126  arg.current = NULL;
1128  baserel,
1130  (void *) &arg,
1131  baserel->lateral_referencers);
1132 
1133  /* Done if there are no more expressions in the foreign rel */
1134  if (arg.current == NULL)
1135  {
1136  Assert(clauses == NIL);
1137  break;
1138  }
1139 
1140  /* Scan the extracted join clauses */
1141  foreach(lc, clauses)
1142  {
1143  RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1144  Relids required_outer;
1145  ParamPathInfo *param_info;
1146 
1147  /* Check if clause can be moved to this rel */
1148  if (!join_clause_is_movable_to(rinfo, baserel))
1149  continue;
1150 
1151  /* See if it is safe to send to remote */
1152  if (!is_foreign_expr(root, baserel, rinfo->clause))
1153  continue;
1154 
1155  /* Calculate required outer rels for the resulting path */
1156  required_outer = bms_union(rinfo->clause_relids,
1157  baserel->lateral_relids);
1158  required_outer = bms_del_member(required_outer, baserel->relid);
1159  if (bms_is_empty(required_outer))
1160  continue;
1161 
1162  /* Get the ParamPathInfo */
1163  param_info = get_baserel_parampathinfo(root, baserel,
1164  required_outer);
1165  Assert(param_info != NULL);
1166 
1167  /* Add it to list unless we already have it */
1168  ppi_list = list_append_unique_ptr(ppi_list, param_info);
1169  }
1170 
1171  /* Try again, now ignoring the expression we found this time */
1172  arg.already_used = lappend(arg.already_used, arg.current);
1173  }
1174  }
1175 
1176  /*
1177  * Now build a path for each useful outer relation.
1178  */
1179  foreach(lc, ppi_list)
1180  {
1181  ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1182  double rows;
1183  int width;
1184  Cost startup_cost;
1185  Cost total_cost;
1186 
1187  /* Get a cost estimate from the remote */
1188  estimate_path_cost_size(root, baserel,
1189  param_info->ppi_clauses, NIL, NULL,
1190  &rows, &width,
1191  &startup_cost, &total_cost);
1192 
1193  /*
1194  * ppi_rows currently won't get looked at by anything, but still we
1195  * may as well ensure that it matches our idea of the rowcount.
1196  */
1197  param_info->ppi_rows = rows;
1198 
1199  /* Make the path */
1200  path = create_foreignscan_path(root, baserel,
1201  NULL, /* default pathtarget */
1202  rows,
1203  startup_cost,
1204  total_cost,
1205  NIL, /* no pathkeys */
1206  param_info->ppi_req_outer,
1207  NULL,
1208  NIL); /* no fdw_private list */
1209  add_path(baserel, (Path *) path);
1210  }
1211 }
1212 
1213 /*
1214  * postgresGetForeignPlan
1215  * Create ForeignScan plan node which implements selected best path
1216  */
1217 static ForeignScan *
1219  RelOptInfo *foreignrel,
1220  Oid foreigntableid,
1221  ForeignPath *best_path,
1222  List *tlist,
1223  List *scan_clauses,
1224  Plan *outer_plan)
1225 {
1226  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1227  Index scan_relid;
1228  List *fdw_private;
1229  List *remote_exprs = NIL;
1230  List *local_exprs = NIL;
1231  List *params_list = NIL;
1232  List *fdw_scan_tlist = NIL;
1233  List *fdw_recheck_quals = NIL;
1234  List *retrieved_attrs;
1235  StringInfoData sql;
1236  bool has_final_sort = false;
1237  bool has_limit = false;
1238  ListCell *lc;
1239 
1240  /*
1241  * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1242  */
1243  if (best_path->fdw_private)
1244  {
1245  has_final_sort = boolVal(list_nth(best_path->fdw_private,
1247  has_limit = boolVal(list_nth(best_path->fdw_private,
1249  }
1250 
1251  if (IS_SIMPLE_REL(foreignrel))
1252  {
1253  /*
1254  * For base relations, set scan_relid as the relid of the relation.
1255  */
1256  scan_relid = foreignrel->relid;
1257 
1258  /*
1259  * In a base-relation scan, we must apply the given scan_clauses.
1260  *
1261  * Separate the scan_clauses into those that can be executed remotely
1262  * and those that can't. baserestrictinfo clauses that were
1263  * previously determined to be safe or unsafe by classifyConditions
1264  * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1265  * else in the scan_clauses list will be a join clause, which we have
1266  * to check for remote-safety.
1267  *
1268  * Note: the join clauses we see here should be the exact same ones
1269  * previously examined by postgresGetForeignPaths. Possibly it'd be
1270  * worth passing forward the classification work done then, rather
1271  * than repeating it here.
1272  *
1273  * This code must match "extract_actual_clauses(scan_clauses, false)"
1274  * except for the additional decision about remote versus local
1275  * execution.
1276  */
1277  foreach(lc, scan_clauses)
1278  {
1279  RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1280 
1281  /* Ignore any pseudoconstants, they're dealt with elsewhere */
1282  if (rinfo->pseudoconstant)
1283  continue;
1284 
1285  if (list_member_ptr(fpinfo->remote_conds, rinfo))
1286  remote_exprs = lappend(remote_exprs, rinfo->clause);
1287  else if (list_member_ptr(fpinfo->local_conds, rinfo))
1288  local_exprs = lappend(local_exprs, rinfo->clause);
1289  else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1290  remote_exprs = lappend(remote_exprs, rinfo->clause);
1291  else
1292  local_exprs = lappend(local_exprs, rinfo->clause);
1293  }
1294 
1295  /*
1296  * For a base-relation scan, we have to support EPQ recheck, which
1297  * should recheck all the remote quals.
1298  */
1299  fdw_recheck_quals = remote_exprs;
1300  }
1301  else
1302  {
1303  /*
1304  * Join relation or upper relation - set scan_relid to 0.
1305  */
1306  scan_relid = 0;
1307 
1308  /*
1309  * For a join rel, baserestrictinfo is NIL and we are not considering
1310  * parameterization right now, so there should be no scan_clauses for
1311  * a joinrel or an upper rel either.
1312  */
1313  Assert(!scan_clauses);
1314 
1315  /*
1316  * Instead we get the conditions to apply from the fdw_private
1317  * structure.
1318  */
1319  remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1320  local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1321 
1322  /*
1323  * We leave fdw_recheck_quals empty in this case, since we never need
1324  * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1325  * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1326  * If we're planning an upperrel (ie, remote grouping or aggregation)
1327  * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1328  * allowed, and indeed we *can't* put the remote clauses into
1329  * fdw_recheck_quals because the unaggregated Vars won't be available
1330  * locally.
1331  */
1332 
1333  /* Build the list of columns to be fetched from the foreign server. */
1334  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1335 
1336  /*
1337  * Ensure that the outer plan produces a tuple whose descriptor
1338  * matches our scan tuple slot. Also, remove the local conditions
1339  * from outer plan's quals, lest they be evaluated twice, once by the
1340  * local plan and once by the scan.
1341  */
1342  if (outer_plan)
1343  {
1344  /*
1345  * Right now, we only consider grouping and aggregation beyond
1346  * joins. Queries involving aggregates or grouping do not require
1347  * EPQ mechanism, hence should not have an outer plan here.
1348  */
1349  Assert(!IS_UPPER_REL(foreignrel));
1350 
1351  /*
1352  * First, update the plan's qual list if possible. In some cases
1353  * the quals might be enforced below the topmost plan level, in
1354  * which case we'll fail to remove them; it's not worth working
1355  * harder than this.
1356  */
1357  foreach(lc, local_exprs)
1358  {
1359  Node *qual = lfirst(lc);
1360 
1361  outer_plan->qual = list_delete(outer_plan->qual, qual);
1362 
1363  /*
1364  * For an inner join the local conditions of foreign scan plan
1365  * can be part of the joinquals as well. (They might also be
1366  * in the mergequals or hashquals, but we can't touch those
1367  * without breaking the plan.)
1368  */
1369  if (IsA(outer_plan, NestLoop) ||
1370  IsA(outer_plan, MergeJoin) ||
1371  IsA(outer_plan, HashJoin))
1372  {
1373  Join *join_plan = (Join *) outer_plan;
1374 
1375  if (join_plan->jointype == JOIN_INNER)
1376  join_plan->joinqual = list_delete(join_plan->joinqual,
1377  qual);
1378  }
1379  }
1380 
1381  /*
1382  * Now fix the subplan's tlist --- this might result in inserting
1383  * a Result node atop the plan tree.
1384  */
1385  outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1386  best_path->path.parallel_safe);
1387  }
1388  }
1389 
1390  /*
1391  * Build the query string to be sent for execution, and identify
1392  * expressions to be sent as parameters.
1393  */
1394  initStringInfo(&sql);
1395  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1396  remote_exprs, best_path->path.pathkeys,
1397  has_final_sort, has_limit, false,
1398  &retrieved_attrs, &params_list);
1399 
1400  /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1401  fpinfo->final_remote_exprs = remote_exprs;
1402 
1403  /*
1404  * Build the fdw_private list that will be available to the executor.
1405  * Items in the list must match order in enum FdwScanPrivateIndex.
1406  */
1407  fdw_private = list_make3(makeString(sql.data),
1408  retrieved_attrs,
1409  makeInteger(fpinfo->fetch_size));
1410  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1411  fdw_private = lappend(fdw_private,
1412  makeString(fpinfo->relation_name));
1413 
1414  /*
1415  * Create the ForeignScan node for the given relation.
1416  *
1417  * Note that the remote parameter expressions are stored in the fdw_exprs
1418  * field of the finished plan node; we can't keep them in private state
1419  * because then they wouldn't be subject to later planner processing.
1420  */
1421  return make_foreignscan(tlist,
1422  local_exprs,
1423  scan_relid,
1424  params_list,
1425  fdw_private,
1426  fdw_scan_tlist,
1427  fdw_recheck_quals,
1428  outer_plan);
1429 }
1430 
1431 /*
1432  * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1433  */
1434 static TupleDesc
1436 {
1437  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1438  EState *estate = node->ss.ps.state;
1439  TupleDesc tupdesc;
1440 
1441  /*
1442  * The core code has already set up a scan tuple slot based on
1443  * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1444  * but there's one case where it isn't. If we have any whole-row row
1445  * identifier Vars, they may have vartype RECORD, and we need to replace
1446  * that with the associated table's actual composite type. This ensures
1447  * that when we read those ROW() expression values from the remote server,
1448  * we can convert them to a composite type the local server knows.
1449  */
1451  for (int i = 0; i < tupdesc->natts; i++)
1452  {
1453  Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1454  Var *var;
1455  RangeTblEntry *rte;
1456  Oid reltype;
1457 
1458  /* Nothing to do if it's not a generic RECORD attribute */
1459  if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1460  continue;
1461 
1462  /*
1463  * If we can't identify the referenced table, do nothing. This'll
1464  * likely lead to failure later, but perhaps we can muddle through.
1465  */
1466  var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1467  i)->expr;
1468  if (!IsA(var, Var) || var->varattno != 0)
1469  continue;
1470  rte = list_nth(estate->es_range_table, var->varno - 1);
1471  if (rte->rtekind != RTE_RELATION)
1472  continue;
1473  reltype = get_rel_type_id(rte->relid);
1474  if (!OidIsValid(reltype))
1475  continue;
1476  att->atttypid = reltype;
1477  /* shouldn't need to change anything else */
1478  }
1479  return tupdesc;
1480 }
1481 
1482 /*
1483  * postgresBeginForeignScan
1484  * Initiate an executor scan of a foreign PostgreSQL table.
1485  */
1486 static void
1488 {
1489  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1490  EState *estate = node->ss.ps.state;
1491  PgFdwScanState *fsstate;
1492  RangeTblEntry *rte;
1493  Oid userid;
1494  ForeignTable *table;
1495  UserMapping *user;
1496  int rtindex;
1497  int numParams;
1498 
1499  /*
1500  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1501  */
1502  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1503  return;
1504 
1505  /*
1506  * We'll save private state in node->fdw_state.
1507  */
1508  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1509  node->fdw_state = (void *) fsstate;
1510 
1511  /*
1512  * Identify which user to do the remote access as. This should match what
1513  * ExecCheckRTEPerms() does.
1514  */
1515  userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1516  if (fsplan->scan.scanrelid > 0)
1517  rtindex = fsplan->scan.scanrelid;
1518  else
1519  rtindex = bms_next_member(fsplan->fs_relids, -1);
1520  rte = exec_rt_fetch(rtindex, estate);
1521 
1522  /* Get info about foreign table. */
1523  table = GetForeignTable(rte->relid);
1524  user = GetUserMapping(userid, table->serverid);
1525 
1526  /*
1527  * Get connection to the foreign server. Connection manager will
1528  * establish new connection if necessary.
1529  */
1530  fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1531 
1532  /* Assign a unique ID for my cursor */
1533  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1534  fsstate->cursor_exists = false;
1535 
1536  /* Get private info created by planner functions. */
1537  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1539  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1541  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1543 
1544  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1545  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1546  "postgres_fdw tuple data",
1548  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1549  "postgres_fdw temporary data",
1551 
1552  /*
1553  * Get info we'll need for converting data fetched from the foreign server
1554  * into local representation and error reporting during that process.
1555  */
1556  if (fsplan->scan.scanrelid > 0)
1557  {
1558  fsstate->rel = node->ss.ss_currentRelation;
1559  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1560  }
1561  else
1562  {
1563  fsstate->rel = NULL;
1564  fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1565  }
1566 
1567  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1568 
1569  /*
1570  * Prepare for processing of parameters used in remote query, if any.
1571  */
1572  numParams = list_length(fsplan->fdw_exprs);
1573  fsstate->numParams = numParams;
1574  if (numParams > 0)
1576  fsplan->fdw_exprs,
1577  numParams,
1578  &fsstate->param_flinfo,
1579  &fsstate->param_exprs,
1580  &fsstate->param_values);
1581 
1582  /* Set the async-capable flag */
1583  fsstate->async_capable = node->ss.ps.async_capable;
1584 }
1585 
1586 /*
1587  * postgresIterateForeignScan
1588  * Retrieve next row from the result set, or clear tuple slot to indicate
1589  * EOF.
1590  */
1591 static TupleTableSlot *
1593 {
1594  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1595  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1596 
1597  /*
1598  * In sync mode, if this is the first call after Begin or ReScan, we need
1599  * to create the cursor on the remote side. In async mode, we would have
1600  * already created the cursor before we get here, even if this is the
1601  * first call after Begin or ReScan.
1602  */
1603  if (!fsstate->cursor_exists)
1604  create_cursor(node);
1605 
1606  /*
1607  * Get some more tuples, if we've run out.
1608  */
1609  if (fsstate->next_tuple >= fsstate->num_tuples)
1610  {
1611  /* In async mode, just clear tuple slot. */
1612  if (fsstate->async_capable)
1613  return ExecClearTuple(slot);
1614  /* No point in another fetch if we already detected EOF, though. */
1615  if (!fsstate->eof_reached)
1616  fetch_more_data(node);
1617  /* If we didn't get any tuples, must be end of data. */
1618  if (fsstate->next_tuple >= fsstate->num_tuples)
1619  return ExecClearTuple(slot);
1620  }
1621 
1622  /*
1623  * Return the next tuple.
1624  */
1625  ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1626  slot,
1627  false);
1628 
1629  return slot;
1630 }
1631 
1632 /*
1633  * postgresReScanForeignScan
1634  * Restart the scan.
1635  */
1636 static void
1638 {
1639  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1640  char sql[64];
1641  PGresult *res;
1642 
1643  /* If we haven't created the cursor yet, nothing to do. */
1644  if (!fsstate->cursor_exists)
1645  return;
1646 
1647  /*
1648  * If the node is async-capable, and an asynchronous fetch for it has been
1649  * begun, the asynchronous fetch might not have yet completed. Check if
1650  * the node is async-capable, and an asynchronous fetch for it is still in
1651  * progress; if so, complete the asynchronous fetch before restarting the
1652  * scan.
1653  */
1654  if (fsstate->async_capable &&
1655  fsstate->conn_state->pendingAreq &&
1656  fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1657  fetch_more_data(node);
1658 
1659  /*
1660  * If any internal parameters affecting this node have changed, we'd
1661  * better destroy and recreate the cursor. Otherwise, rewinding it should
1662  * be good enough. If we've only fetched zero or one batch, we needn't
1663  * even rewind the cursor, just rescan what we have.
1664  */
1665  if (node->ss.ps.chgParam != NULL)
1666  {
1667  fsstate->cursor_exists = false;
1668  snprintf(sql, sizeof(sql), "CLOSE c%u",
1669  fsstate->cursor_number);
1670  }
1671  else if (fsstate->fetch_ct_2 > 1)
1672  {
1673  snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1674  fsstate->cursor_number);
1675  }
1676  else
1677  {
1678  /* Easy: just rescan what we already have in memory, if anything */
1679  fsstate->next_tuple = 0;
1680  return;
1681  }
1682 
1683  /*
1684  * We don't use a PG_TRY block here, so be careful not to throw error
1685  * without releasing the PGresult.
1686  */
1687  res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1689  pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1690  PQclear(res);
1691 
1692  /* Now force a fresh FETCH. */
1693  fsstate->tuples = NULL;
1694  fsstate->num_tuples = 0;
1695  fsstate->next_tuple = 0;
1696  fsstate->fetch_ct_2 = 0;
1697  fsstate->eof_reached = false;
1698 }
1699 
1700 /*
1701  * postgresEndForeignScan
1702  * Finish scanning foreign table and dispose objects used for this scan
1703  */
1704 static void
1706 {
1707  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1708 
1709  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1710  if (fsstate == NULL)
1711  return;
1712 
1713  /* Close the cursor if open, to prevent accumulation of cursors */
1714  if (fsstate->cursor_exists)
1715  close_cursor(fsstate->conn, fsstate->cursor_number,
1716  fsstate->conn_state);
1717 
1718  /* Release remote connection */
1719  ReleaseConnection(fsstate->conn);
1720  fsstate->conn = NULL;
1721 
1722  /* MemoryContexts will be deleted automatically. */
1723 }
1724 
1725 /*
1726  * postgresAddForeignUpdateTargets
1727  * Add resjunk column(s) needed for update/delete on a foreign table
1728  */
1729 static void
1731  Index rtindex,
1732  RangeTblEntry *target_rte,
1733  Relation target_relation)
1734 {
1735  Var *var;
1736 
1737  /*
1738  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1739  */
1740 
1741  /* Make a Var representing the desired value */
1742  var = makeVar(rtindex,
1744  TIDOID,
1745  -1,
1746  InvalidOid,
1747  0);
1748 
1749  /* Register it as a row-identity column needed by this target rel */
1750  add_row_identity_var(root, var, rtindex, "ctid");
1751 }
1752 
1753 /*
1754  * postgresPlanForeignModify
1755  * Plan an insert/update/delete operation on a foreign table
1756  */
1757 static List *
1759  ModifyTable *plan,
1760  Index resultRelation,
1761  int subplan_index)
1762 {
1763  CmdType operation = plan->operation;
1764  RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1765  Relation rel;
1766  StringInfoData sql;
1767  List *targetAttrs = NIL;
1768  List *withCheckOptionList = NIL;
1769  List *returningList = NIL;
1770  List *retrieved_attrs = NIL;
1771  bool doNothing = false;
1772  int values_end_len = -1;
1773 
1774  initStringInfo(&sql);
1775 
1776  /*
1777  * Core code already has some lock on each rel being planned, so we can
1778  * use NoLock here.
1779  */
1780  rel = table_open(rte->relid, NoLock);
1781 
1782  /*
1783  * In an INSERT, we transmit all columns that are defined in the foreign
1784  * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1785  * foreign table, we transmit all columns like INSERT; else we transmit
1786  * only columns that were explicitly targets of the UPDATE, so as to avoid
1787  * unnecessary data transmission. (We can't do that for INSERT since we
1788  * would miss sending default values for columns not listed in the source
1789  * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1790  * those triggers might change values for non-target columns, in which
1791  * case we would miss sending changed values for those columns.)
1792  */
1793  if (operation == CMD_INSERT ||
1794  (operation == CMD_UPDATE &&
1795  rel->trigdesc &&
1797  {
1798  TupleDesc tupdesc = RelationGetDescr(rel);
1799  int attnum;
1800 
1801  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1802  {
1803  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1804 
1805  if (!attr->attisdropped)
1806  targetAttrs = lappend_int(targetAttrs, attnum);
1807  }
1808  }
1809  else if (operation == CMD_UPDATE)
1810  {
1811  int col;
1812  Bitmapset *allUpdatedCols = bms_union(rte->updatedCols, rte->extraUpdatedCols);
1813 
1814  col = -1;
1815  while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1816  {
1817  /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1819 
1820  if (attno <= InvalidAttrNumber) /* shouldn't happen */
1821  elog(ERROR, "system-column update is not supported");
1822  targetAttrs = lappend_int(targetAttrs, attno);
1823  }
1824  }
1825 
1826  /*
1827  * Extract the relevant WITH CHECK OPTION list if any.
1828  */
1829  if (plan->withCheckOptionLists)
1830  withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1831  subplan_index);
1832 
1833  /*
1834  * Extract the relevant RETURNING list if any.
1835  */
1836  if (plan->returningLists)
1837  returningList = (List *) list_nth(plan->returningLists, subplan_index);
1838 
1839  /*
1840  * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1841  * should have already been rejected in the optimizer, as presently there
1842  * is no way to recognize an arbiter index on a foreign table. Only DO
1843  * NOTHING is supported without an inference specification.
1844  */
1845  if (plan->onConflictAction == ONCONFLICT_NOTHING)
1846  doNothing = true;
1847  else if (plan->onConflictAction != ONCONFLICT_NONE)
1848  elog(ERROR, "unexpected ON CONFLICT specification: %d",
1849  (int) plan->onConflictAction);
1850 
1851  /*
1852  * Construct the SQL command string.
1853  */
1854  switch (operation)
1855  {
1856  case CMD_INSERT:
1857  deparseInsertSql(&sql, rte, resultRelation, rel,
1858  targetAttrs, doNothing,
1859  withCheckOptionList, returningList,
1860  &retrieved_attrs, &values_end_len);
1861  break;
1862  case CMD_UPDATE:
1863  deparseUpdateSql(&sql, rte, resultRelation, rel,
1864  targetAttrs,
1865  withCheckOptionList, returningList,
1866  &retrieved_attrs);
1867  break;
1868  case CMD_DELETE:
1869  deparseDeleteSql(&sql, rte, resultRelation, rel,
1870  returningList,
1871  &retrieved_attrs);
1872  break;
1873  default:
1874  elog(ERROR, "unexpected operation: %d", (int) operation);
1875  break;
1876  }
1877 
1878  table_close(rel, NoLock);
1879 
1880  /*
1881  * Build the fdw_private list that will be available to the executor.
1882  * Items in the list must match enum FdwModifyPrivateIndex, above.
1883  */
1884  return list_make5(makeString(sql.data),
1885  targetAttrs,
1886  makeInteger(values_end_len),
1887  makeBoolean((retrieved_attrs != NIL)),
1888  retrieved_attrs);
1889 }
1890 
1891 /*
1892  * postgresBeginForeignModify
1893  * Begin an insert/update/delete operation on a foreign table
1894  */
1895 static void
1897  ResultRelInfo *resultRelInfo,
1898  List *fdw_private,
1899  int subplan_index,
1900  int eflags)
1901 {
1902  PgFdwModifyState *fmstate;
1903  char *query;
1904  List *target_attrs;
1905  bool has_returning;
1906  int values_end_len;
1907  List *retrieved_attrs;
1908  RangeTblEntry *rte;
1909 
1910  /*
1911  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1912  * stays NULL.
1913  */
1914  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1915  return;
1916 
1917  /* Deconstruct fdw_private data. */
1918  query = strVal(list_nth(fdw_private,
1920  target_attrs = (List *) list_nth(fdw_private,
1922  values_end_len = intVal(list_nth(fdw_private,
1924  has_returning = boolVal(list_nth(fdw_private,
1926  retrieved_attrs = (List *) list_nth(fdw_private,
1928 
1929  /* Find RTE. */
1930  rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1931  mtstate->ps.state);
1932 
1933  /* Construct an execution state. */
1934  fmstate = create_foreign_modify(mtstate->ps.state,
1935  rte,
1936  resultRelInfo,
1937  mtstate->operation,
1938  outerPlanState(mtstate)->plan,
1939  query,
1940  target_attrs,
1941  values_end_len,
1942  has_returning,
1943  retrieved_attrs);
1944 
1945  resultRelInfo->ri_FdwState = fmstate;
1946 }
1947 
1948 /*
1949  * postgresExecForeignInsert
1950  * Insert one row into a foreign table
1951  */
1952 static TupleTableSlot *
1954  ResultRelInfo *resultRelInfo,
1955  TupleTableSlot *slot,
1956  TupleTableSlot *planSlot)
1957 {
1958  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1959  TupleTableSlot **rslot;
1960  int numSlots = 1;
1961 
1962  /*
1963  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1964  * postgresBeginForeignInsert())
1965  */
1966  if (fmstate->aux_fmstate)
1967  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1968  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1969  &slot, &planSlot, &numSlots);
1970  /* Revert that change */
1971  if (fmstate->aux_fmstate)
1972  resultRelInfo->ri_FdwState = fmstate;
1973 
1974  return rslot ? *rslot : NULL;
1975 }
1976 
1977 /*
1978  * postgresExecForeignBatchInsert
1979  * Insert multiple rows into a foreign table
1980  */
1981 static TupleTableSlot **
1983  ResultRelInfo *resultRelInfo,
1984  TupleTableSlot **slots,
1985  TupleTableSlot **planSlots,
1986  int *numSlots)
1987 {
1988  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1989  TupleTableSlot **rslot;
1990 
1991  /*
1992  * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1993  * postgresBeginForeignInsert())
1994  */
1995  if (fmstate->aux_fmstate)
1996  resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1997  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1998  slots, planSlots, numSlots);
1999  /* Revert that change */
2000  if (fmstate->aux_fmstate)
2001  resultRelInfo->ri_FdwState = fmstate;
2002 
2003  return rslot;
2004 }
2005 
2006 /*
2007  * postgresGetForeignModifyBatchSize
2008  * Determine the maximum number of tuples that can be inserted in bulk
2009  *
2010  * Returns the batch size specified for server or table. When batching is not
2011  * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2012  * clause), returns 1.
2013  */
2014 static int
2016 {
2017  int batch_size;
2018  PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState ?
2019  (PgFdwModifyState *) resultRelInfo->ri_FdwState :
2020  NULL;
2021 
2022  /* should be called only once */
2023  Assert(resultRelInfo->ri_BatchSize == 0);
2024 
2025  /*
2026  * Should never get called when the insert is being performed as part of a
2027  * row movement operation.
2028  */
2029  Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2030 
2031  /*
2032  * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2033  * the option directly in server/table options. Otherwise just use the
2034  * value we determined earlier.
2035  */
2036  if (fmstate)
2037  batch_size = fmstate->batch_size;
2038  else
2039  batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2040 
2041  /*
2042  * Disable batching when we have to use RETURNING, there are any
2043  * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2044  * WITH CHECK OPTION constraints from parent views.
2045  *
2046  * When there are any BEFORE ROW INSERT triggers on the table, we can't
2047  * support it, because such triggers might query the table we're inserting
2048  * into and act differently if the tuples that have already been processed
2049  * and prepared for insertion are not there.
2050  */
2051  if (resultRelInfo->ri_projectReturning != NULL ||
2052  resultRelInfo->ri_WithCheckOptions != NIL ||
2053  (resultRelInfo->ri_TrigDesc &&
2054  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2055  resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2056  return 1;
2057 
2058  /*
2059  * If the foreign table has no columns, disable batching as the INSERT
2060  * syntax doesn't allow batching multiple empty rows into a zero-column
2061  * table in a single statement. This is needed for COPY FROM, in which
2062  * case fmstate must be non-NULL.
2063  */
2064  if (fmstate && list_length(fmstate->target_attrs) == 0)
2065  return 1;
2066 
2067  /*
2068  * Otherwise use the batch size specified for server/table. The number of
2069  * parameters in a batch is limited to 65535 (uint16), so make sure we
2070  * don't exceed this limit by using the maximum batch_size possible.
2071  */
2072  if (fmstate && fmstate->p_nums > 0)
2073  batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2074 
2075  return batch_size;
2076 }
2077 
2078 /*
2079  * postgresExecForeignUpdate
2080  * Update one row in a foreign table
2081  */
2082 static TupleTableSlot *
2084  ResultRelInfo *resultRelInfo,
2085  TupleTableSlot *slot,
2086  TupleTableSlot *planSlot)
2087 {
2088  TupleTableSlot **rslot;
2089  int numSlots = 1;
2090 
2091  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2092  &slot, &planSlot, &numSlots);
2093 
2094  return rslot ? rslot[0] : NULL;
2095 }
2096 
2097 /*
2098  * postgresExecForeignDelete
2099  * Delete one row from a foreign table
2100  */
2101 static TupleTableSlot *
2103  ResultRelInfo *resultRelInfo,
2104  TupleTableSlot *slot,
2105  TupleTableSlot *planSlot)
2106 {
2107  TupleTableSlot **rslot;
2108  int numSlots = 1;
2109 
2110  rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2111  &slot, &planSlot, &numSlots);
2112 
2113  return rslot ? rslot[0] : NULL;
2114 }
2115 
2116 /*
2117  * postgresEndForeignModify
2118  * Finish an insert/update/delete operation on a foreign table
2119  */
2120 static void
2122  ResultRelInfo *resultRelInfo)
2123 {
2124  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2125 
2126  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2127  if (fmstate == NULL)
2128  return;
2129 
2130  /* Destroy the execution state */
2131  finish_foreign_modify(fmstate);
2132 }
2133 
2134 /*
2135  * postgresBeginForeignInsert
2136  * Begin an insert operation on a foreign table
2137  */
2138 static void
2140  ResultRelInfo *resultRelInfo)
2141 {
2142  PgFdwModifyState *fmstate;
2143  ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2144  EState *estate = mtstate->ps.state;
2145  Index resultRelation;
2146  Relation rel = resultRelInfo->ri_RelationDesc;
2147  RangeTblEntry *rte;
2148  TupleDesc tupdesc = RelationGetDescr(rel);
2149  int attnum;
2150  int values_end_len;
2151  StringInfoData sql;
2152  List *targetAttrs = NIL;
2153  List *retrieved_attrs = NIL;
2154  bool doNothing = false;
2155 
2156  /*
2157  * If the foreign table we are about to insert routed rows into is also an
2158  * UPDATE subplan result rel that will be updated later, proceeding with
2159  * the INSERT will result in the later UPDATE incorrectly modifying those
2160  * routed rows, so prevent the INSERT --- it would be nice if we could
2161  * handle this case; but for now, throw an error for safety.
2162  */
2163  if (plan && plan->operation == CMD_UPDATE &&
2164  (resultRelInfo->ri_usesFdwDirectModify ||
2165  resultRelInfo->ri_FdwState))
2166  ereport(ERROR,
2167  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2168  errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2169  RelationGetRelationName(rel))));
2170 
2171  initStringInfo(&sql);
2172 
2173  /* We transmit all columns that are defined in the foreign table. */
2174  for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2175  {
2176  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2177 
2178  if (!attr->attisdropped)
2179  targetAttrs = lappend_int(targetAttrs, attnum);
2180  }
2181 
2182  /* Check if we add the ON CONFLICT clause to the remote query. */
2183  if (plan)
2184  {
2185  OnConflictAction onConflictAction = plan->onConflictAction;
2186 
2187  /* We only support DO NOTHING without an inference specification. */
2188  if (onConflictAction == ONCONFLICT_NOTHING)
2189  doNothing = true;
2190  else if (onConflictAction != ONCONFLICT_NONE)
2191  elog(ERROR, "unexpected ON CONFLICT specification: %d",
2192  (int) onConflictAction);
2193  }
2194 
2195  /*
2196  * If the foreign table is a partition that doesn't have a corresponding
2197  * RTE entry, we need to create a new RTE describing the foreign table for
2198  * use by deparseInsertSql and create_foreign_modify() below, after first
2199  * copying the parent's RTE and modifying some fields to describe the
2200  * foreign partition to work on. However, if this is invoked by UPDATE,
2201  * the existing RTE may already correspond to this partition if it is one
2202  * of the UPDATE subplan target rels; in that case, we can just use the
2203  * existing RTE as-is.
2204  */
2205  if (resultRelInfo->ri_RangeTableIndex == 0)
2206  {
2207  ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2208 
2209  rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2210  rte = copyObject(rte);
2211  rte->relid = RelationGetRelid(rel);
2212  rte->relkind = RELKIND_FOREIGN_TABLE;
2213 
2214  /*
2215  * For UPDATE, we must use the RT index of the first subplan target
2216  * rel's RTE, because the core code would have built expressions for
2217  * the partition, such as RETURNING, using that RT index as varno of
2218  * Vars contained in those expressions.
2219  */
2220  if (plan && plan->operation == CMD_UPDATE &&
2221  rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2222  resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2223  else
2224  resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2225  }
2226  else
2227  {
2228  resultRelation = resultRelInfo->ri_RangeTableIndex;
2229  rte = exec_rt_fetch(resultRelation, estate);
2230  }
2231 
2232  /* Construct the SQL command string. */
2233  deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2234  resultRelInfo->ri_WithCheckOptions,
2235  resultRelInfo->ri_returningList,
2236  &retrieved_attrs, &values_end_len);
2237 
2238  /* Construct an execution state. */
2239  fmstate = create_foreign_modify(mtstate->ps.state,
2240  rte,
2241  resultRelInfo,
2242  CMD_INSERT,
2243  NULL,
2244  sql.data,
2245  targetAttrs,
2246  values_end_len,
2247  retrieved_attrs != NIL,
2248  retrieved_attrs);
2249 
2250  /*
2251  * If the given resultRelInfo already has PgFdwModifyState set, it means
2252  * the foreign table is an UPDATE subplan result rel; in which case, store
2253  * the resulting state into the aux_fmstate of the PgFdwModifyState.
2254  */
2255  if (resultRelInfo->ri_FdwState)
2256  {
2257  Assert(plan && plan->operation == CMD_UPDATE);
2258  Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2259  ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2260  }
2261  else
2262  resultRelInfo->ri_FdwState = fmstate;
2263 }
2264 
2265 /*
2266  * postgresEndForeignInsert
2267  * Finish an insert operation on a foreign table
2268  */
2269 static void
2271  ResultRelInfo *resultRelInfo)
2272 {
2273  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2274 
2275  Assert(fmstate != NULL);
2276 
2277  /*
2278  * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2279  * postgresBeginForeignInsert())
2280  */
2281  if (fmstate->aux_fmstate)
2282  fmstate = fmstate->aux_fmstate;
2283 
2284  /* Destroy the execution state */
2285  finish_foreign_modify(fmstate);
2286 }
2287 
2288 /*
2289  * postgresIsForeignRelUpdatable
2290  * Determine whether a foreign table supports INSERT, UPDATE and/or
2291  * DELETE.
2292  */
2293 static int
2295 {
2296  bool updatable;
2297  ForeignTable *table;
2298  ForeignServer *server;
2299  ListCell *lc;
2300 
2301  /*
2302  * By default, all postgres_fdw foreign tables are assumed updatable. This
2303  * can be overridden by a per-server setting, which in turn can be
2304  * overridden by a per-table setting.
2305  */
2306  updatable = true;
2307 
2308  table = GetForeignTable(RelationGetRelid(rel));
2309  server = GetForeignServer(table->serverid);
2310 
2311  foreach(lc, server->options)
2312  {
2313  DefElem *def = (DefElem *) lfirst(lc);
2314 
2315  if (strcmp(def->defname, "updatable") == 0)
2316  updatable = defGetBoolean(def);
2317  }
2318  foreach(lc, table->options)
2319  {
2320  DefElem *def = (DefElem *) lfirst(lc);
2321 
2322  if (strcmp(def->defname, "updatable") == 0)
2323  updatable = defGetBoolean(def);
2324  }
2325 
2326  /*
2327  * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2328  */
2329  return updatable ?
2330  (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2331 }
2332 
2333 /*
2334  * postgresRecheckForeignScan
2335  * Execute a local join execution plan for a foreign join
2336  */
2337 static bool
2339 {
2340  Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2342  TupleTableSlot *result;
2343 
2344  /* For base foreign relations, it suffices to set fdw_recheck_quals */
2345  if (scanrelid > 0)
2346  return true;
2347 
2348  Assert(outerPlan != NULL);
2349 
2350  /* Execute a local join execution plan */
2351  result = ExecProcNode(outerPlan);
2352  if (TupIsNull(result))
2353  return false;
2354 
2355  /* Store result in the given slot */
2356  ExecCopySlot(slot, result);
2357 
2358  return true;
2359 }
2360 
2361 /*
2362  * find_modifytable_subplan
2363  * Helper routine for postgresPlanDirectModify to find the
2364  * ModifyTable subplan node that scans the specified RTI.
2365  *
2366  * Returns NULL if the subplan couldn't be identified. That's not a fatal
2367  * error condition, we just abandon trying to do the update directly.
2368  */
2369 static ForeignScan *
2371  ModifyTable *plan,
2372  Index rtindex,
2373  int subplan_index)
2374 {
2375  Plan *subplan = outerPlan(plan);
2376 
2377  /*
2378  * The cases we support are (1) the desired ForeignScan is the immediate
2379  * child of ModifyTable, or (2) it is the subplan_index'th child of an
2380  * Append node that is the immediate child of ModifyTable. There is no
2381  * point in looking further down, as that would mean that local joins are
2382  * involved, so we can't do the update directly.
2383  *
2384  * There could be a Result atop the Append too, acting to compute the
2385  * UPDATE targetlist values. We ignore that here; the tlist will be
2386  * checked by our caller.
2387  *
2388  * In principle we could examine all the children of the Append, but it's
2389  * currently unlikely that the core planner would generate such a plan
2390  * with the children out-of-order. Moreover, such a search risks costing
2391  * O(N^2) time when there are a lot of children.
2392  */
2393  if (IsA(subplan, Append))
2394  {
2395  Append *appendplan = (Append *) subplan;
2396 
2397  if (subplan_index < list_length(appendplan->appendplans))
2398  subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2399  }
2400  else if (IsA(subplan, Result) &&
2401  outerPlan(subplan) != NULL &&
2402  IsA(outerPlan(subplan), Append))
2403  {
2404  Append *appendplan = (Append *) outerPlan(subplan);
2405 
2406  if (subplan_index < list_length(appendplan->appendplans))
2407  subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2408  }
2409 
2410  /* Now, have we got a ForeignScan on the desired rel? */
2411  if (IsA(subplan, ForeignScan))
2412  {
2413  ForeignScan *fscan = (ForeignScan *) subplan;
2414 
2415  if (bms_is_member(rtindex, fscan->fs_relids))
2416  return fscan;
2417  }
2418 
2419  return NULL;
2420 }
2421 
2422 /*
2423  * postgresPlanDirectModify
2424  * Consider a direct foreign table modification
2425  *
2426  * Decide whether it is safe to modify a foreign table directly, and if so,
2427  * rewrite subplan accordingly.
2428  */
2429 static bool
2431  ModifyTable *plan,
2432  Index resultRelation,
2433  int subplan_index)
2434 {
2435  CmdType operation = plan->operation;
2436  RelOptInfo *foreignrel;
2437  RangeTblEntry *rte;
2438  PgFdwRelationInfo *fpinfo;
2439  Relation rel;
2440  StringInfoData sql;
2441  ForeignScan *fscan;
2442  List *processed_tlist = NIL;
2443  List *targetAttrs = NIL;
2444  List *remote_exprs;
2445  List *params_list = NIL;
2446  List *returningList = NIL;
2447  List *retrieved_attrs = NIL;
2448 
2449  /*
2450  * Decide whether it is safe to modify a foreign table directly.
2451  */
2452 
2453  /*
2454  * The table modification must be an UPDATE or DELETE.
2455  */
2456  if (operation != CMD_UPDATE && operation != CMD_DELETE)
2457  return false;
2458 
2459  /*
2460  * Try to locate the ForeignScan subplan that's scanning resultRelation.
2461  */
2462  fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2463  if (!fscan)
2464  return false;
2465 
2466  /*
2467  * It's unsafe to modify a foreign table directly if there are any quals
2468  * that should be evaluated locally.
2469  */
2470  if (fscan->scan.plan.qual != NIL)
2471  return false;
2472 
2473  /* Safe to fetch data about the target foreign rel */
2474  if (fscan->scan.scanrelid == 0)
2475  {
2476  foreignrel = find_join_rel(root, fscan->fs_relids);
2477  /* We should have a rel for this foreign join. */
2478  Assert(foreignrel);
2479  }
2480  else
2481  foreignrel = root->simple_rel_array[resultRelation];
2482  rte = root->simple_rte_array[resultRelation];
2483  fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2484 
2485  /*
2486  * It's unsafe to update a foreign table directly, if any expressions to
2487  * assign to the target columns are unsafe to evaluate remotely.
2488  */
2489  if (operation == CMD_UPDATE)
2490  {
2491  ListCell *lc,
2492  *lc2;
2493 
2494  /*
2495  * The expressions of concern are the first N columns of the processed
2496  * targetlist, where N is the length of the rel's update_colnos.
2497  */
2498  get_translated_update_targetlist(root, resultRelation,
2499  &processed_tlist, &targetAttrs);
2500  forboth(lc, processed_tlist, lc2, targetAttrs)
2501  {
2502  TargetEntry *tle = lfirst_node(TargetEntry, lc);
2503  AttrNumber attno = lfirst_int(lc2);
2504 
2505  /* update's new-value expressions shouldn't be resjunk */
2506  Assert(!tle->resjunk);
2507 
2508  if (attno <= InvalidAttrNumber) /* shouldn't happen */
2509  elog(ERROR, "system-column update is not supported");
2510 
2511  if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2512  return false;
2513  }
2514  }
2515 
2516  /*
2517  * Ok, rewrite subplan so as to modify the foreign table directly.
2518  */
2519  initStringInfo(&sql);
2520 
2521  /*
2522  * Core code already has some lock on each rel being planned, so we can
2523  * use NoLock here.
2524  */
2525  rel = table_open(rte->relid, NoLock);
2526 
2527  /*
2528  * Recall the qual clauses that must be evaluated remotely. (These are
2529  * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2530  * doesn't care.)
2531  */
2532  remote_exprs = fpinfo->final_remote_exprs;
2533 
2534  /*
2535  * Extract the relevant RETURNING list if any.
2536  */
2537  if (plan->returningLists)
2538  {
2539  returningList = (List *) list_nth(plan->returningLists, subplan_index);
2540 
2541  /*
2542  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2543  * we fetch from the foreign server any Vars specified in RETURNING
2544  * that refer not only to the target relation but to non-target
2545  * relations. So we'll deparse them into the RETURNING clause of the
2546  * remote query; use a targetlist consisting of them instead, which
2547  * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2548  * node below.
2549  */
2550  if (fscan->scan.scanrelid == 0)
2551  returningList = build_remote_returning(resultRelation, rel,
2552  returningList);
2553  }
2554 
2555  /*
2556  * Construct the SQL command string.
2557  */
2558  switch (operation)
2559  {
2560  case CMD_UPDATE:
2561  deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2562  foreignrel,
2563  processed_tlist,
2564  targetAttrs,
2565  remote_exprs, &params_list,
2566  returningList, &retrieved_attrs);
2567  break;
2568  case CMD_DELETE:
2569  deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2570  foreignrel,
2571  remote_exprs, &params_list,
2572  returningList, &retrieved_attrs);
2573  break;
2574  default:
2575  elog(ERROR, "unexpected operation: %d", (int) operation);
2576  break;
2577  }
2578 
2579  /*
2580  * Update the operation and target relation info.
2581  */
2582  fscan->operation = operation;
2583  fscan->resultRelation = resultRelation;
2584 
2585  /*
2586  * Update the fdw_exprs list that will be available to the executor.
2587  */
2588  fscan->fdw_exprs = params_list;
2589 
2590  /*
2591  * Update the fdw_private list that will be available to the executor.
2592  * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2593  */
2594  fscan->fdw_private = list_make4(makeString(sql.data),
2595  makeBoolean((retrieved_attrs != NIL)),
2596  retrieved_attrs,
2597  makeBoolean(plan->canSetTag));
2598 
2599  /*
2600  * Update the foreign-join-related fields.
2601  */
2602  if (fscan->scan.scanrelid == 0)
2603  {
2604  /* No need for the outer subplan. */
2605  fscan->scan.plan.lefttree = NULL;
2606 
2607  /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2608  if (returningList)
2609  rebuild_fdw_scan_tlist(fscan, returningList);
2610  }
2611 
2612  /*
2613  * Finally, unset the async-capable flag if it is set, as we currently
2614  * don't support asynchronous execution of direct modifications.
2615  */
2616  if (fscan->scan.plan.async_capable)
2617  fscan->scan.plan.async_capable = false;
2618 
2619  table_close(rel, NoLock);
2620  return true;
2621 }
2622 
2623 /*
2624  * postgresBeginDirectModify
2625  * Prepare a direct foreign table modification
2626  */
2627 static void
2629 {
2630  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2631  EState *estate = node->ss.ps.state;
2632  PgFdwDirectModifyState *dmstate;
2633  Index rtindex;
2634  Oid userid;
2635  ForeignTable *table;
2636  UserMapping *user;
2637  int numParams;
2638 
2639  /*
2640  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2641  */
2642  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2643  return;
2644 
2645  /*
2646  * We'll save private state in node->fdw_state.
2647  */
2648  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2649  node->fdw_state = (void *) dmstate;
2650 
2651  /*
2652  * Identify which user to do the remote access as. This should match what
2653  * ExecCheckRTEPerms() does.
2654  */
2655  userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2656 
2657  /* Get info about foreign table. */
2658  rtindex = node->resultRelInfo->ri_RangeTableIndex;
2659  if (fsplan->scan.scanrelid == 0)
2660  dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2661  else
2662  dmstate->rel = node->ss.ss_currentRelation;
2663  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2664  user = GetUserMapping(userid, table->serverid);
2665 
2666  /*
2667  * Get connection to the foreign server. Connection manager will
2668  * establish new connection if necessary.
2669  */
2670  dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2671 
2672  /* Update the foreign-join-related fields. */
2673  if (fsplan->scan.scanrelid == 0)
2674  {
2675  /* Save info about foreign table. */
2676  dmstate->resultRel = dmstate->rel;
2677 
2678  /*
2679  * Set dmstate->rel to NULL to teach get_returning_data() and
2680  * make_tuple_from_result_row() that columns fetched from the remote
2681  * server are described by fdw_scan_tlist of the foreign-scan plan
2682  * node, not the tuple descriptor for the target relation.
2683  */
2684  dmstate->rel = NULL;
2685  }
2686 
2687  /* Initialize state variable */
2688  dmstate->num_tuples = -1; /* -1 means not set yet */
2689 
2690  /* Get private info created by planner functions. */
2691  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2693  dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2695  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2697  dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2699 
2700  /* Create context for per-tuple temp workspace. */
2701  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2702  "postgres_fdw temporary data",
2704 
2705  /* Prepare for input conversion of RETURNING results. */
2706  if (dmstate->has_returning)
2707  {
2708  TupleDesc tupdesc;
2709 
2710  if (fsplan->scan.scanrelid == 0)
2711  tupdesc = get_tupdesc_for_join_scan_tuples(node);
2712  else
2713  tupdesc = RelationGetDescr(dmstate->rel);
2714 
2715  dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2716 
2717  /*
2718  * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2719  * initialize a filter to extract an updated/deleted tuple from a scan
2720  * tuple.
2721  */
2722  if (fsplan->scan.scanrelid == 0)
2723  init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2724  }
2725 
2726  /*
2727  * Prepare for processing of parameters used in remote query, if any.
2728  */
2729  numParams = list_length(fsplan->fdw_exprs);
2730  dmstate->numParams = numParams;
2731  if (numParams > 0)
2733  fsplan->fdw_exprs,
2734  numParams,
2735  &dmstate->param_flinfo,
2736  &dmstate->param_exprs,
2737  &dmstate->param_values);
2738 }
2739 
2740 /*
2741  * postgresIterateDirectModify
2742  * Execute a direct foreign table modification
2743  */
2744 static TupleTableSlot *
2746 {
2748  EState *estate = node->ss.ps.state;
2749  ResultRelInfo *resultRelInfo = node->resultRelInfo;
2750 
2751  /*
2752  * If this is the first call after Begin, execute the statement.
2753  */
2754  if (dmstate->num_tuples == -1)
2755  execute_dml_stmt(node);
2756 
2757  /*
2758  * If the local query doesn't specify RETURNING, just clear tuple slot.
2759  */
2760  if (!resultRelInfo->ri_projectReturning)
2761  {
2762  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2763  Instrumentation *instr = node->ss.ps.instrument;
2764 
2765  Assert(!dmstate->has_returning);
2766 
2767  /* Increment the command es_processed count if necessary. */
2768  if (dmstate->set_processed)
2769  estate->es_processed += dmstate->num_tuples;
2770 
2771  /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2772  if (instr)
2773  instr->tuplecount += dmstate->num_tuples;
2774 
2775  return ExecClearTuple(slot);
2776  }
2777 
2778  /*
2779  * Get the next RETURNING tuple.
2780  */
2781  return get_returning_data(node);
2782 }
2783 
2784 /*
2785  * postgresEndDirectModify
2786  * Finish a direct foreign table modification
2787  */
2788 static void
2790 {
2792 
2793  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2794  if (dmstate == NULL)
2795  return;
2796 
2797  /* Release PGresult */
2798  PQclear(dmstate->result);
2799 
2800  /* Release remote connection */
2801  ReleaseConnection(dmstate->conn);
2802  dmstate->conn = NULL;
2803 
2804  /* MemoryContext will be deleted automatically. */
2805 }
2806 
2807 /*
2808  * postgresExplainForeignScan
2809  * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2810  */
2811 static void
2813 {
2814  ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2815  List *fdw_private = plan->fdw_private;
2816 
2817  /*
2818  * Identify foreign scans that are really joins or upper relations. The
2819  * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2820  * digit string(s), which are RT indexes, with the correct relation names.
2821  * We do that here, not when the plan is created, because we can't know
2822  * what aliases ruleutils.c will assign at plan creation time.
2823  */
2824  if (list_length(fdw_private) > FdwScanPrivateRelations)
2825  {
2826  StringInfo relations;
2827  char *rawrelations;
2828  char *ptr;
2829  int minrti,
2830  rtoffset;
2831 
2832  rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2833 
2834  /*
2835  * A difficulty with using a string representation of RT indexes is
2836  * that setrefs.c won't update the string when flattening the
2837  * rangetable. To find out what rtoffset was applied, identify the
2838  * minimum RT index appearing in the string and compare it to the
2839  * minimum member of plan->fs_relids. (We expect all the relids in
2840  * the join will have been offset by the same amount; the Asserts
2841  * below should catch it if that ever changes.)
2842  */
2843  minrti = INT_MAX;
2844  ptr = rawrelations;
2845  while (*ptr)
2846  {
2847  if (isdigit((unsigned char) *ptr))
2848  {
2849  int rti = strtol(ptr, &ptr, 10);
2850 
2851  if (rti < minrti)
2852  minrti = rti;
2853  }
2854  else
2855  ptr++;
2856  }
2857  rtoffset = bms_next_member(plan->fs_relids, -1) - minrti;
2858 
2859  /* Now we can translate the string */
2860  relations = makeStringInfo();
2861  ptr = rawrelations;
2862  while (*ptr)
2863  {
2864  if (isdigit((unsigned char) *ptr))
2865  {
2866  int rti = strtol(ptr, &ptr, 10);
2867  RangeTblEntry *rte;
2868  char *relname;
2869  char *refname;
2870 
2871  rti += rtoffset;
2872  Assert(bms_is_member(rti, plan->fs_relids));
2873  rte = rt_fetch(rti, es->rtable);
2874  Assert(rte->rtekind == RTE_RELATION);
2875  /* This logic should agree with explain.c's ExplainTargetRel */
2876  relname = get_rel_name(rte->relid);
2877  if (es->verbose)
2878  {
2879  char *namespace;
2880 
2881  namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid));
2882  appendStringInfo(relations, "%s.%s",
2883  quote_identifier(namespace),
2885  }
2886  else
2887  appendStringInfoString(relations,
2889  refname = (char *) list_nth(es->rtable_names, rti - 1);
2890  if (refname == NULL)
2891  refname = rte->eref->aliasname;
2892  if (strcmp(refname, relname) != 0)
2893  appendStringInfo(relations, " %s",
2894  quote_identifier(refname));
2895  }
2896  else
2897  appendStringInfoChar(relations, *ptr++);
2898  }
2899  ExplainPropertyText("Relations", relations->data, es);
2900  }
2901 
2902  /*
2903  * Add remote query, when VERBOSE option is specified.
2904  */
2905  if (es->verbose)
2906  {
2907  char *sql;
2908 
2909  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2910  ExplainPropertyText("Remote SQL", sql, es);
2911  }
2912 }
2913 
2914 /*
2915  * postgresExplainForeignModify
2916  * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2917  */
2918 static void
2920  ResultRelInfo *rinfo,
2921  List *fdw_private,
2922  int subplan_index,
2923  ExplainState *es)
2924 {
2925  if (es->verbose)
2926  {
2927  char *sql = strVal(list_nth(fdw_private,
2929 
2930  ExplainPropertyText("Remote SQL", sql, es);
2931 
2932  /*
2933  * For INSERT we should always have batch size >= 1, but UPDATE and
2934  * DELETE don't support batching so don't show the property.
2935  */
2936  if (rinfo->ri_BatchSize > 0)
2937  ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2938  }
2939 }
2940 
2941 /*
2942  * postgresExplainDirectModify
2943  * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2944  * foreign table directly
2945  */
2946 static void
2948 {
2949  List *fdw_private;
2950  char *sql;
2951 
2952  if (es->verbose)
2953  {
2954  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2955  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2956  ExplainPropertyText("Remote SQL", sql, es);
2957  }
2958 }
2959 
2960 /*
2961  * postgresExecForeignTruncate
2962  * Truncate one or more foreign tables
2963  */
2964 static void
2966  DropBehavior behavior,
2967  bool restart_seqs)
2968 {
2969  Oid serverid = InvalidOid;
2970  UserMapping *user = NULL;
2971  PGconn *conn = NULL;
2972  StringInfoData sql;
2973  ListCell *lc;
2974  bool server_truncatable = true;
2975 
2976  /*
2977  * By default, all postgres_fdw foreign tables are assumed truncatable.
2978  * This can be overridden by a per-server setting, which in turn can be
2979  * overridden by a per-table setting.
2980  */
2981  foreach(lc, rels)
2982  {
2983  ForeignServer *server = NULL;
2984  Relation rel = lfirst(lc);
2986  ListCell *cell;
2987  bool truncatable;
2988 
2989  /*
2990  * First time through, determine whether the foreign server allows
2991  * truncates. Since all specified foreign tables are assumed to belong
2992  * to the same foreign server, this result can be used for other
2993  * foreign tables.
2994  */
2995  if (!OidIsValid(serverid))
2996  {
2997  serverid = table->serverid;
2998  server = GetForeignServer(serverid);
2999 
3000  foreach(cell, server->options)
3001  {
3002  DefElem *defel = (DefElem *) lfirst(cell);
3003 
3004  if (strcmp(defel->defname, "truncatable") == 0)
3005  {
3006  server_truncatable = defGetBoolean(defel);
3007  break;
3008  }
3009  }
3010  }
3011 
3012  /*
3013  * Confirm that all specified foreign tables belong to the same
3014  * foreign server.
3015  */
3016  Assert(table->serverid == serverid);
3017 
3018  /* Determine whether this foreign table allows truncations */
3019  truncatable = server_truncatable;
3020  foreach(cell, table->options)
3021  {
3022  DefElem *defel = (DefElem *) lfirst(cell);
3023 
3024  if (strcmp(defel->defname, "truncatable") == 0)
3025  {
3026  truncatable = defGetBoolean(defel);
3027  break;
3028  }
3029  }
3030 
3031  if (!truncatable)
3032  ereport(ERROR,
3033  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3034  errmsg("foreign table \"%s\" does not allow truncates",
3035  RelationGetRelationName(rel))));
3036  }
3037  Assert(OidIsValid(serverid));
3038 
3039  /*
3040  * Get connection to the foreign server. Connection manager will
3041  * establish new connection if necessary.
3042  */
3043  user = GetUserMapping(GetUserId(), serverid);
3044  conn = GetConnection(user, false, NULL);
3045 
3046  /* Construct the TRUNCATE command string */
3047  initStringInfo(&sql);
3048  deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3049 
3050  /* Issue the TRUNCATE command to remote server */
3051  do_sql_command(conn, sql.data);
3052 
3053  pfree(sql.data);
3054 }
3055 
3056 /*
3057  * estimate_path_cost_size
3058  * Get cost and size estimates for a foreign scan on given foreign relation
3059  * either a base relation or a join between foreign relations or an upper
3060  * relation containing foreign relations.
3061  *
3062  * param_join_conds are the parameterization clauses with outer relations.
3063  * pathkeys specify the expected sort order if any for given path being costed.
3064  * fpextra specifies additional post-scan/join-processing steps such as the
3065  * final sort and the LIMIT restriction.
3066  *
3067  * The function returns the cost and size estimates in p_rows, p_width,
3068  * p_startup_cost and p_total_cost variables.
3069  */
3070 static void
3072  RelOptInfo *foreignrel,
3073  List *param_join_conds,
3074  List *pathkeys,
3075  PgFdwPathExtraData *fpextra,
3076  double *p_rows, int *p_width,
3077  Cost *p_startup_cost, Cost *p_total_cost)
3078 {
3079  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3080  double rows;
3081  double retrieved_rows;
3082  int width;
3083  Cost startup_cost;
3084  Cost total_cost;
3085 
3086  /* Make sure the core code has set up the relation's reltarget */
3087  Assert(foreignrel->reltarget);
3088 
3089  /*
3090  * If the table or the server is configured to use remote estimates,
3091  * connect to the foreign server and execute EXPLAIN to estimate the
3092  * number of rows selected by the restriction+join clauses. Otherwise,
3093  * estimate rows using whatever statistics we have locally, in a way
3094  * similar to ordinary tables.
3095  */
3096  if (fpinfo->use_remote_estimate)
3097  {
3098  List *remote_param_join_conds;
3099  List *local_param_join_conds;
3100  StringInfoData sql;
3101  PGconn *conn;
3102  Selectivity local_sel;
3103  QualCost local_cost;
3104  List *fdw_scan_tlist = NIL;
3105  List *remote_conds;
3106 
3107  /* Required only to be passed to deparseSelectStmtForRel */
3108  List *retrieved_attrs;
3109 
3110  /*
3111  * param_join_conds might contain both clauses that are safe to send
3112  * across, and clauses that aren't.
3113  */
3114  classifyConditions(root, foreignrel, param_join_conds,
3115  &remote_param_join_conds, &local_param_join_conds);
3116 
3117  /* Build the list of columns to be fetched from the foreign server. */
3118  if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3119  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3120  else
3121  fdw_scan_tlist = NIL;
3122 
3123  /*
3124  * The complete list of remote conditions includes everything from
3125  * baserestrictinfo plus any extra join_conds relevant to this
3126  * particular path.
3127  */
3128  remote_conds = list_concat(remote_param_join_conds,
3129  fpinfo->remote_conds);
3130 
3131  /*
3132  * Construct EXPLAIN query including the desired SELECT, FROM, and
3133  * WHERE clauses. Params and other-relation Vars are replaced by dummy
3134  * values, so don't request params_list.
3135  */
3136  initStringInfo(&sql);
3137  appendStringInfoString(&sql, "EXPLAIN ");
3138  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3139  remote_conds, pathkeys,
3140  fpextra ? fpextra->has_final_sort : false,
3141  fpextra ? fpextra->has_limit : false,
3142  false, &retrieved_attrs, NULL);
3143 
3144  /* Get the remote estimate */
3145  conn = GetConnection(fpinfo->user, false, NULL);
3146  get_remote_estimate(sql.data, conn, &rows, &width,
3147  &startup_cost, &total_cost);
3149 
3150  retrieved_rows = rows;
3151 
3152  /* Factor in the selectivity of the locally-checked quals */
3153  local_sel = clauselist_selectivity(root,
3154  local_param_join_conds,
3155  foreignrel->relid,
3156  JOIN_INNER,
3157  NULL);
3158  local_sel *= fpinfo->local_conds_sel;
3159 
3160  rows = clamp_row_est(rows * local_sel);
3161 
3162  /* Add in the eval cost of the locally-checked quals */
3163  startup_cost += fpinfo->local_conds_cost.startup;
3164  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3165  cost_qual_eval(&local_cost, local_param_join_conds, root);
3166  startup_cost += local_cost.startup;
3167  total_cost += local_cost.per_tuple * retrieved_rows;
3168 
3169  /*
3170  * Add in tlist eval cost for each output row. In case of an
3171  * aggregate, some of the tlist expressions such as grouping
3172  * expressions will be evaluated remotely, so adjust the costs.
3173  */
3174  startup_cost += foreignrel->reltarget->cost.startup;
3175  total_cost += foreignrel->reltarget->cost.startup;
3176  total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3177  if (IS_UPPER_REL(foreignrel))
3178  {
3179  QualCost tlist_cost;
3180 
3181  cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3182  startup_cost -= tlist_cost.startup;
3183  total_cost -= tlist_cost.startup;
3184  total_cost -= tlist_cost.per_tuple * rows;
3185  }
3186  }
3187  else
3188  {
3189  Cost run_cost = 0;
3190 
3191  /*
3192  * We don't support join conditions in this mode (hence, no
3193  * parameterized paths can be made).
3194  */
3195  Assert(param_join_conds == NIL);
3196 
3197  /*
3198  * We will come here again and again with different set of pathkeys or
3199  * additional post-scan/join-processing steps that caller wants to
3200  * cost. We don't need to calculate the cost/size estimates for the
3201  * underlying scan, join, or grouping each time. Instead, use those
3202  * estimates if we have cached them already.
3203  */
3204  if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3205  {
3206  Assert(fpinfo->retrieved_rows >= 0);
3207 
3208  rows = fpinfo->rows;
3209  retrieved_rows = fpinfo->retrieved_rows;
3210  width = fpinfo->width;
3211  startup_cost = fpinfo->rel_startup_cost;
3212  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3213 
3214  /*
3215  * If we estimate the costs of a foreign scan or a foreign join
3216  * with additional post-scan/join-processing steps, the scan or
3217  * join costs obtained from the cache wouldn't yet contain the
3218  * eval costs for the final scan/join target, which would've been
3219  * updated by apply_scanjoin_target_to_paths(); add the eval costs
3220  * now.
3221  */
3222  if (fpextra && !IS_UPPER_REL(foreignrel))
3223  {
3224  /* Shouldn't get here unless we have LIMIT */
3225  Assert(fpextra->has_limit);
3226  Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3227  foreignrel->reloptkind == RELOPT_JOINREL);
3228  startup_cost += foreignrel->reltarget->cost.startup;
3229  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3230  }
3231  }
3232  else if (IS_JOIN_REL(foreignrel))
3233  {
3234  PgFdwRelationInfo *fpinfo_i;
3235  PgFdwRelationInfo *fpinfo_o;
3236  QualCost join_cost;
3237  QualCost remote_conds_cost;
3238  double nrows;
3239 
3240  /* Use rows/width estimates made by the core code. */
3241  rows = foreignrel->rows;
3242  width = foreignrel->reltarget->width;
3243 
3244  /* For join we expect inner and outer relations set */
3245  Assert(fpinfo->innerrel && fpinfo->outerrel);
3246 
3247  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3248  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3249 
3250  /* Estimate of number of rows in cross product */
3251  nrows = fpinfo_i->rows * fpinfo_o->rows;
3252 
3253  /*
3254  * Back into an estimate of the number of retrieved rows. Just in
3255  * case this is nuts, clamp to at most nrows.
3256  */
3257  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3258  retrieved_rows = Min(retrieved_rows, nrows);
3259 
3260  /*
3261  * The cost of foreign join is estimated as cost of generating
3262  * rows for the joining relations + cost for applying quals on the
3263  * rows.
3264  */
3265 
3266  /*
3267  * Calculate the cost of clauses pushed down to the foreign server
3268  */
3269  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3270  /* Calculate the cost of applying join clauses */
3271  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3272 
3273  /*
3274  * Startup cost includes startup cost of joining relations and the
3275  * startup cost for join and other clauses. We do not include the
3276  * startup cost specific to join strategy (e.g. setting up hash
3277  * tables) since we do not know what strategy the foreign server
3278  * is going to use.
3279  */
3280  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3281  startup_cost += join_cost.startup;
3282  startup_cost += remote_conds_cost.startup;
3283  startup_cost += fpinfo->local_conds_cost.startup;
3284 
3285  /*
3286  * Run time cost includes:
3287  *
3288  * 1. Run time cost (total_cost - startup_cost) of relations being
3289  * joined
3290  *
3291  * 2. Run time cost of applying join clauses on the cross product
3292  * of the joining relations.
3293  *
3294  * 3. Run time cost of applying pushed down other clauses on the
3295  * result of join
3296  *
3297  * 4. Run time cost of applying nonpushable other clauses locally
3298  * on the result fetched from the foreign server.
3299  */
3300  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3301  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3302  run_cost += nrows * join_cost.per_tuple;
3303  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3304  run_cost += nrows * remote_conds_cost.per_tuple;
3305  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3306 
3307  /* Add in tlist eval cost for each output row */
3308  startup_cost += foreignrel->reltarget->cost.startup;
3309  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3310  }
3311  else if (IS_UPPER_REL(foreignrel))
3312  {
3313  RelOptInfo *outerrel = fpinfo->outerrel;
3314  PgFdwRelationInfo *ofpinfo;
3315  AggClauseCosts aggcosts;
3316  double input_rows;
3317  int numGroupCols;
3318  double numGroups = 1;
3319 
3320  /* The upper relation should have its outer relation set */
3321  Assert(outerrel);
3322  /* and that outer relation should have its reltarget set */
3323  Assert(outerrel->reltarget);
3324 
3325  /*
3326  * This cost model is mixture of costing done for sorted and
3327  * hashed aggregates in cost_agg(). We are not sure which
3328  * strategy will be considered at remote side, thus for
3329  * simplicity, we put all startup related costs in startup_cost
3330  * and all finalization and run cost are added in total_cost.
3331  */
3332 
3333  ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3334 
3335  /* Get rows from input rel */
3336  input_rows = ofpinfo->rows;
3337 
3338  /* Collect statistics about aggregates for estimating costs. */
3339  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3340  if (root->parse->hasAggs)
3341  {
3342  get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts);
3343  }
3344 
3345  /* Get number of grouping columns and possible number of groups */
3346  numGroupCols = list_length(root->parse->groupClause);
3347  numGroups = estimate_num_groups(root,
3349  fpinfo->grouped_tlist),
3350  input_rows, NULL, NULL);
3351 
3352  /*
3353  * Get the retrieved_rows and rows estimates. If there are HAVING
3354  * quals, account for their selectivity.
3355  */
3356  if (root->hasHavingQual)
3357  {
3358  /* Factor in the selectivity of the remotely-checked quals */
3359  retrieved_rows =
3360  clamp_row_est(numGroups *
3362  fpinfo->remote_conds,
3363  0,
3364  JOIN_INNER,
3365  NULL));
3366  /* Factor in the selectivity of the locally-checked quals */
3367  rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3368  }
3369  else
3370  {
3371  rows = retrieved_rows = numGroups;
3372  }
3373 
3374  /* Use width estimate made by the core code. */
3375  width = foreignrel->reltarget->width;
3376 
3377  /*-----
3378  * Startup cost includes:
3379  * 1. Startup cost for underneath input relation, adjusted for
3380  * tlist replacement by apply_scanjoin_target_to_paths()
3381  * 2. Cost of performing aggregation, per cost_agg()
3382  *-----
3383  */
3384  startup_cost = ofpinfo->rel_startup_cost;
3385  startup_cost += outerrel->reltarget->cost.startup;
3386  startup_cost += aggcosts.transCost.startup;
3387  startup_cost += aggcosts.transCost.per_tuple * input_rows;
3388  startup_cost += aggcosts.finalCost.startup;
3389  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3390 
3391  /*-----
3392  * Run time cost includes:
3393  * 1. Run time cost of underneath input relation, adjusted for
3394  * tlist replacement by apply_scanjoin_target_to_paths()
3395  * 2. Run time cost of performing aggregation, per cost_agg()
3396  *-----
3397  */
3398  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3399  run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3400  run_cost += aggcosts.finalCost.per_tuple * numGroups;
3401  run_cost += cpu_tuple_cost * numGroups;
3402 
3403  /* Account for the eval cost of HAVING quals, if any */
3404  if (root->hasHavingQual)
3405  {
3406  QualCost remote_cost;
3407 
3408  /* Add in the eval cost of the remotely-checked quals */
3409  cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3410  startup_cost += remote_cost.startup;
3411  run_cost += remote_cost.per_tuple * numGroups;
3412  /* Add in the eval cost of the locally-checked quals */
3413  startup_cost += fpinfo->local_conds_cost.startup;
3414  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3415  }
3416 
3417  /* Add in tlist eval cost for each output row */
3418  startup_cost += foreignrel->reltarget->cost.startup;
3419  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3420  }
3421  else
3422  {
3423  Cost cpu_per_tuple;
3424 
3425  /* Use rows/width estimates made by set_baserel_size_estimates. */
3426  rows = foreignrel->rows;
3427  width = foreignrel->reltarget->width;
3428 
3429  /*
3430  * Back into an estimate of the number of retrieved rows. Just in
3431  * case this is nuts, clamp to at most foreignrel->tuples.
3432  */
3433  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3434  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3435 
3436  /*
3437  * Cost as though this were a seqscan, which is pessimistic. We
3438  * effectively imagine the local_conds are being evaluated
3439  * remotely, too.
3440  */
3441  startup_cost = 0;
3442  run_cost = 0;
3443  run_cost += seq_page_cost * foreignrel->pages;
3444 
3445  startup_cost += foreignrel->baserestrictcost.startup;
3446  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3447  run_cost += cpu_per_tuple * foreignrel->tuples;
3448 
3449  /* Add in tlist eval cost for each output row */
3450  startup_cost += foreignrel->reltarget->cost.startup;
3451  run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3452  }
3453 
3454  /*
3455  * Without remote estimates, we have no real way to estimate the cost
3456  * of generating sorted output. It could be free if the query plan
3457  * the remote side would have chosen generates properly-sorted output
3458  * anyway, but in most cases it will cost something. Estimate a value
3459  * high enough that we won't pick the sorted path when the ordering
3460  * isn't locally useful, but low enough that we'll err on the side of
3461  * pushing down the ORDER BY clause when it's useful to do so.
3462  */
3463  if (pathkeys != NIL)
3464  {
3465  if (IS_UPPER_REL(foreignrel))
3466  {
3467  Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3468  fpinfo->stage == UPPERREL_GROUP_AGG);
3469  adjust_foreign_grouping_path_cost(root, pathkeys,
3470  retrieved_rows, width,
3471  fpextra->limit_tuples,
3472  &startup_cost, &run_cost);
3473  }
3474  else
3475  {
3476  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3477  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3478  }
3479  }
3480 
3481  total_cost = startup_cost + run_cost;
3482 
3483  /* Adjust the cost estimates if we have LIMIT */
3484  if (fpextra && fpextra->has_limit)
3485  {
3486  adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3487  fpextra->offset_est, fpextra->count_est);
3488  retrieved_rows = rows;
3489  }
3490  }
3491 
3492  /*
3493  * If this includes the final sort step, the given target, which will be
3494  * applied to the resulting path, might have different expressions from
3495  * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3496  * eval costs.
3497  */
3498  if (fpextra && fpextra->has_final_sort &&
3499  fpextra->target != foreignrel->reltarget)
3500  {
3501  QualCost oldcost = foreignrel->reltarget->cost;
3502  QualCost newcost = fpextra->target->cost;
3503 
3504  startup_cost += newcost.startup - oldcost.startup;
3505  total_cost += newcost.startup - oldcost.startup;
3506  total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3507  }
3508 
3509  /*
3510  * Cache the retrieved rows and cost estimates for scans, joins, or
3511  * groupings without any parameterization, pathkeys, or additional
3512  * post-scan/join-processing steps, before adding the costs for
3513  * transferring data from the foreign server. These estimates are useful
3514  * for costing remote joins involving this relation or costing other
3515  * remote operations on this relation such as remote sorts and remote
3516  * LIMIT restrictions, when the costs can not be obtained from the foreign
3517  * server. This function will be called at least once for every foreign
3518  * relation without any parameterization, pathkeys, or additional
3519  * post-scan/join-processing steps.
3520  */
3521  if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3522  {
3523  fpinfo->retrieved_rows = retrieved_rows;
3524  fpinfo->rel_startup_cost = startup_cost;
3525  fpinfo->rel_total_cost = total_cost;
3526  }
3527 
3528  /*
3529  * Add some additional cost factors to account for connection overhead
3530  * (fdw_startup_cost), transferring data across the network
3531  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3532  * (cpu_tuple_cost per retrieved row).
3533  */
3534  startup_cost += fpinfo->fdw_startup_cost;
3535  total_cost += fpinfo->fdw_startup_cost;
3536  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3537  total_cost += cpu_tuple_cost * retrieved_rows;
3538 
3539  /*
3540  * If we have LIMIT, we should prefer performing the restriction remotely
3541  * rather than locally, as the former avoids extra row fetches from the
3542  * remote that the latter might cause. But since the core code doesn't
3543  * account for such fetches when estimating the costs of the local
3544  * restriction (see create_limit_path()), there would be no difference
3545  * between the costs of the local restriction and the costs of the remote
3546  * restriction estimated above if we don't use remote estimates (except
3547  * for the case where the foreignrel is a grouping relation, the given
3548  * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3549  * accounted for in costing the remote restriction). Tweak the costs of
3550  * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3551  * one.
3552  */
3553  if (!fpinfo->use_remote_estimate &&
3554  fpextra && fpextra->has_limit &&
3555  fpextra->limit_tuples > 0 &&
3556  fpextra->limit_tuples < fpinfo->rows)
3557  {
3558  Assert(fpinfo->rows > 0);
3559  total_cost -= (total_cost - startup_cost) * 0.05 *
3560  (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3561  }
3562 
3563  /* Return results. */
3564  *p_rows = rows;
3565  *p_width = width;
3566  *p_startup_cost = startup_cost;
3567  *p_total_cost = total_cost;
3568 }
3569 
3570 /*
3571  * Estimate costs of executing a SQL statement remotely.
3572  * The given "sql" must be an EXPLAIN command.
3573  */
3574 static void
3575 get_remote_estimate(const char *sql, PGconn *conn,
3576  double *rows, int *width,
3577  Cost *startup_cost, Cost *total_cost)
3578 {
3579  PGresult *volatile res = NULL;
3580 
3581  /* PGresult must be released before leaving this function. */
3582  PG_TRY();
3583  {
3584  char *line;
3585  char *p;
3586  int n;
3587 
3588  /*
3589  * Execute EXPLAIN remotely.
3590  */
3591  res = pgfdw_exec_query(conn, sql, NULL);
3593  pgfdw_report_error(ERROR, res, conn, false, sql);
3594 
3595  /*
3596  * Extract cost numbers for topmost plan node. Note we search for a
3597  * left paren from the end of the line to avoid being confused by
3598  * other uses of parentheses.
3599  */
3600  line = PQgetvalue(res, 0, 0);
3601  p = strrchr(line, '(');
3602  if (p == NULL)
3603  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3604  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3605  startup_cost, total_cost, rows, width);
3606  if (n != 4)
3607  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3608  }
3609  PG_FINALLY();
3610  {
3611  PQclear(res);
3612  }
3613  PG_END_TRY();
3614 }
3615 
3616 /*
3617  * Adjust the cost estimates of a foreign grouping path to include the cost of
3618  * generating properly-sorted output.
3619  */
3620 static void
3622  List *pathkeys,
3623  double retrieved_rows,
3624  double width,
3625  double limit_tuples,
3626  Cost *p_startup_cost,
3627  Cost *p_run_cost)
3628 {
3629  /*
3630  * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3631  * side is unlikely to generate properly-sorted output, so it would need
3632  * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3633  * if the GROUP BY clause is sort-able but isn't a superset of the given
3634  * pathkeys, adjust the costs with that function. Otherwise, adjust the
3635  * costs by applying the same heuristic as for the scan or join case.
3636  */
3637  if (!grouping_is_sortable(root->parse->groupClause) ||
3638  !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3639  {
3640  Path sort_path; /* dummy for result of cost_sort */
3641 
3642  cost_sort(&sort_path,
3643  root,
3644  pathkeys,
3645  *p_startup_cost + *p_run_cost,
3646  retrieved_rows,
3647  width,
3648  0.0,
3649  work_mem,
3650  limit_tuples);
3651 
3652  *p_startup_cost = sort_path.startup_cost;
3653  *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3654  }
3655  else
3656  {
3657  /*
3658  * The default extra cost seems too large for foreign-grouping cases;
3659  * add 1/4th of that default.
3660  */
3661  double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3662  - 1.0) * 0.25;
3663 
3664  *p_startup_cost *= sort_multiplier;
3665  *p_run_cost *= sort_multiplier;
3666  }
3667 }
3668 
3669 /*
3670  * Detect whether we want to process an EquivalenceClass member.
3671  *
3672  * This is a callback for use by generate_implied_equalities_for_column.
3673  */
3674 static bool
3677  void *arg)
3678 {
3680  Expr *expr = em->em_expr;
3681 
3682  /*
3683  * If we've identified what we're processing in the current scan, we only
3684  * want to match that expression.
3685  */
3686  if (state->current != NULL)
3687  return equal(expr, state->current);
3688 
3689  /*
3690  * Otherwise, ignore anything we've already processed.
3691  */
3692  if (list_member(state->already_used, expr))
3693  return false;
3694 
3695  /* This is the new target to process. */
3696  state->current = expr;
3697  return true;
3698 }
3699 
3700 /*
3701  * Create cursor for node's query with current parameter values.
3702  */
3703 static void
3705 {
3706  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3707  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3708  int numParams = fsstate->numParams;
3709  const char **values = fsstate->param_values;
3710  PGconn *conn = fsstate->conn;
3712  PGresult *res;
3713 
3714  /* First, process a pending asynchronous request, if any. */
3715  if (fsstate->conn_state->pendingAreq)
3717 
3718  /*
3719  * Construct array of query parameter values in text format. We do the
3720  * conversions in the short-lived per-tuple context, so as not to cause a
3721  * memory leak over repeated scans.
3722  */
3723  if (numParams > 0)
3724  {
3725  MemoryContext oldcontext;
3726 
3727  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3728 
3729  process_query_params(econtext,
3730  fsstate->param_flinfo,
3731  fsstate->param_exprs,
3732  values);
3733 
3734  MemoryContextSwitchTo(oldcontext);
3735  }
3736 
3737  /* Construct the DECLARE CURSOR command */
3738  initStringInfo(&buf);
3739  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3740  fsstate->cursor_number, fsstate->query);
3741 
3742  /*
3743  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3744  * to infer types for all parameters. Since we explicitly cast every
3745  * parameter (see deparse.c), the "inference" is trivial and will produce
3746  * the desired result. This allows us to avoid assuming that the remote
3747  * server has the same OIDs we do for the parameters' types.
3748  */
3749  if (!PQsendQueryParams(conn, buf.data, numParams,
3750  NULL, values, NULL, NULL, 0))
3751  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3752 
3753  /*
3754  * Get the result, and check for success.
3755  *
3756  * We don't use a PG_TRY block here, so be careful not to throw error
3757  * without releasing the PGresult.
3758  */
3759  res = pgfdw_get_result(conn, buf.data);
3761  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3762  PQclear(res);
3763 
3764  /* Mark the cursor as created, and show no tuples have been retrieved */
3765  fsstate->cursor_exists = true;
3766  fsstate->tuples = NULL;
3767  fsstate->num_tuples = 0;
3768  fsstate->next_tuple = 0;
3769  fsstate->fetch_ct_2 = 0;
3770  fsstate->eof_reached = false;
3771 
3772  /* Clean up */
3773  pfree(buf.data);
3774 }
3775 
3776 /*
3777  * Fetch some more rows from the node's cursor.
3778  */
3779 static void
3781 {
3782  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3783  PGresult *volatile res = NULL;
3784  MemoryContext oldcontext;
3785 
3786  /*
3787  * We'll store the tuples in the batch_cxt. First, flush the previous
3788  * batch.
3789  */
3790  fsstate->tuples = NULL;
3791  MemoryContextReset(fsstate->batch_cxt);
3792  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3793 
3794  /* PGresult must be released before leaving this function. */
3795  PG_TRY();
3796  {
3797  PGconn *conn = fsstate->conn;
3798  int numrows;
3799  int i;
3800 
3801  if (fsstate->async_capable)
3802  {
3803  Assert(fsstate->conn_state->pendingAreq);
3804 
3805  /*
3806  * The query was already sent by an earlier call to
3807  * fetch_more_data_begin. So now we just fetch the result.
3808  */
3809  res = pgfdw_get_result(conn, fsstate->query);
3810  /* On error, report the original query, not the FETCH. */
3812  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3813 
3814  /* Reset per-connection state */
3815  fsstate->conn_state->pendingAreq = NULL;
3816  }
3817  else
3818  {
3819  char sql[64];
3820 
3821  /* This is a regular synchronous fetch. */
3822  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3823  fsstate->fetch_size, fsstate->cursor_number);
3824 
3825  res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3826  /* On error, report the original query, not the FETCH. */
3828  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3829  }
3830 
3831  /* Convert the data into HeapTuples */
3832  numrows = PQntuples(res);
3833  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3834  fsstate->num_tuples = numrows;
3835  fsstate->next_tuple = 0;
3836 
3837  for (i = 0; i < numrows; i++)
3838  {
3839  Assert(IsA(node->ss.ps.plan, ForeignScan));
3840 
3841  fsstate->tuples[i] =
3843  fsstate->rel,
3844  fsstate->attinmeta,
3845  fsstate->retrieved_attrs,
3846  node,
3847  fsstate->temp_cxt);
3848  }
3849 
3850  /* Update fetch_ct_2 */
3851  if (fsstate->fetch_ct_2 < 2)
3852  fsstate->fetch_ct_2++;
3853 
3854  /* Must be EOF if we didn't get as many tuples as we asked for. */
3855  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3856  }
3857  PG_FINALLY();
3858  {
3859  PQclear(res);
3860  }
3861  PG_END_TRY();
3862 
3863  MemoryContextSwitchTo(oldcontext);
3864 }
3865 
3866 /*
3867  * Force assorted GUC parameters to settings that ensure that we'll output
3868  * data values in a form that is unambiguous to the remote server.
3869  *
3870  * This is rather expensive and annoying to do once per row, but there's
3871  * little choice if we want to be sure values are transmitted accurately;
3872  * we can't leave the settings in place between rows for fear of affecting
3873  * user-visible computations.
3874  *
3875  * We use the equivalent of a function SET option to allow the settings to
3876  * persist only until the caller calls reset_transmission_modes(). If an
3877  * error is thrown in between, guc.c will take care of undoing the settings.
3878  *
3879  * The return value is the nestlevel that must be passed to
3880  * reset_transmission_modes() to undo things.
3881  */
3882 int
3884 {
3885  int nestlevel = NewGUCNestLevel();
3886 
3887  /*
3888  * The values set here should match what pg_dump does. See also
3889  * configure_remote_session in connection.c.
3890  */
3891  if (DateStyle != USE_ISO_DATES)
3892  (void) set_config_option("datestyle", "ISO",
3894  GUC_ACTION_SAVE, true, 0, false);
3896  (void) set_config_option("intervalstyle", "postgres",
3898  GUC_ACTION_SAVE, true, 0, false);
3899  if (extra_float_digits < 3)
3900  (void) set_config_option("extra_float_digits", "3",
3902  GUC_ACTION_SAVE, true, 0, false);
3903 
3904  /*
3905  * In addition force restrictive search_path, in case there are any
3906  * regproc or similar constants to be printed.
3907  */
3908  (void) set_config_option("search_path", "pg_catalog",
3910  GUC_ACTION_SAVE, true, 0, false);
3911 
3912  return nestlevel;
3913 }
3914 
3915 /*
3916  * Undo the effects of set_transmission_modes().
3917  */
3918 void
3920 {
3921  AtEOXact_GUC(true, nestlevel);
3922 }
3923 
3924 /*
3925  * Utility routine to close a cursor.
3926  */
3927 static void
3929  PgFdwConnState *conn_state)
3930 {
3931  char sql[64];
3932  PGresult *res;
3933 
3934  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3935 
3936  /*
3937  * We don't use a PG_TRY block here, so be careful not to throw error
3938  * without releasing the PGresult.
3939  */
3940  res = pgfdw_exec_query(conn, sql, conn_state);
3942  pgfdw_report_error(ERROR, res, conn, true, sql);
3943  PQclear(res);
3944 }
3945 
3946 /*
3947  * create_foreign_modify
3948  * Construct an execution state of a foreign insert/update/delete
3949  * operation
3950  */
3951 static PgFdwModifyState *
3953  RangeTblEntry *rte,
3954  ResultRelInfo *resultRelInfo,
3955  CmdType operation,
3956  Plan *subplan,
3957  char *query,
3958  List *target_attrs,
3959  int values_end,
3960  bool has_returning,
3961  List *retrieved_attrs)
3962 {
3963  PgFdwModifyState *fmstate;
3964  Relation rel = resultRelInfo->ri_RelationDesc;
3965  TupleDesc tupdesc = RelationGetDescr(rel);
3966  Oid userid;
3967  ForeignTable *table;
3968  UserMapping *user;
3969  AttrNumber n_params;
3970  Oid typefnoid;
3971  bool isvarlena;
3972  ListCell *lc;
3973 
3974  /* Begin constructing PgFdwModifyState. */
3975  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3976  fmstate->rel = rel;
3977 
3978  /*
3979  * Identify which user to do the remote access as. This should match what
3980  * ExecCheckRTEPerms() does.
3981  */
3982  userid = OidIsValid(rte->checkAsUser) ? rte->checkAsUser : GetUserId();
3983 
3984  /* Get info about foreign table. */
3985  table = GetForeignTable(RelationGetRelid(rel));
3986  user = GetUserMapping(userid, table->serverid);
3987 
3988  /* Open connection; report that we'll create a prepared statement. */
3989  fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
3990  fmstate->p_name = NULL; /* prepared statement not made yet */
3991 
3992  /* Set up remote query information. */
3993  fmstate->query = query;
3994  if (operation == CMD_INSERT)
3995  {
3996  fmstate->query = pstrdup(fmstate->query);
3997  fmstate->orig_query = pstrdup(fmstate->query);
3998  }
3999  fmstate->target_attrs = target_attrs;
4000  fmstate->values_end = values_end;
4001  fmstate->has_returning = has_returning;
4002  fmstate->retrieved_attrs = retrieved_attrs;
4003 
4004  /* Create context for per-tuple temp workspace. */
4005  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4006  "postgres_fdw temporary data",
4008 
4009  /* Prepare for input conversion of RETURNING results. */
4010  if (fmstate->has_returning)
4011  fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4012 
4013  /* Prepare for output conversion of parameters used in prepared stmt. */
4014  n_params = list_length(fmstate->target_attrs) + 1;
4015  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
4016  fmstate->p_nums = 0;
4017 
4018  if (operation == CMD_UPDATE || operation == CMD_DELETE)
4019  {
4020  Assert(subplan != NULL);
4021 
4022  /* Find the ctid resjunk column in the subplan's result */
4024  "ctid");
4025  if (!AttributeNumberIsValid(fmstate->ctidAttno))
4026  elog(ERROR, "could not find junk ctid column");
4027 
4028  /* First transmittable parameter will be ctid */
4029  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4030  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4031  fmstate->p_nums++;
4032  }
4033 
4034  if (operation == CMD_INSERT || operation == CMD_UPDATE)
4035  {
4036  /* Set up for remaining transmittable parameters */
4037  foreach(lc, fmstate->target_attrs)
4038  {
4039  int attnum = lfirst_int(lc);
4040  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4041 
4042  Assert(!attr->attisdropped);
4043 
4044  /* Ignore generated columns; they are set to DEFAULT */
4045  if (attr->attgenerated)
4046  continue;
4047  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4048  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4049  fmstate->p_nums++;
4050  }
4051  }
4052 
4053  Assert(fmstate->p_nums <= n_params);
4054 
4055  /* Set batch_size from foreign server/table options. */
4056  if (operation == CMD_INSERT)
4057  fmstate->batch_size = get_batch_size_option(rel);
4058 
4059  fmstate->num_slots = 1;
4060 
4061  /* Initialize auxiliary state */
4062  fmstate->aux_fmstate = NULL;
4063 
4064  return fmstate;
4065 }
4066 
4067 /*
4068  * execute_foreign_modify
4069  * Perform foreign-table modification as required, and fetch RETURNING
4070  * result if any. (This is the shared guts of postgresExecForeignInsert,
4071  * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4072  * postgresExecForeignDelete.)
4073  */
4074 static TupleTableSlot **
4076  ResultRelInfo *resultRelInfo,
4077  CmdType operation,
4078  TupleTableSlot **slots,
4079  TupleTableSlot **planSlots,
4080  int *numSlots)
4081 {
4082  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4083  ItemPointer ctid = NULL;
4084  const char **p_values;
4085  PGresult *res;
4086  int n_rows;
4087  StringInfoData sql;
4088 
4089  /* The operation should be INSERT, UPDATE, or DELETE */
4090  Assert(operation == CMD_INSERT ||
4091  operation == CMD_UPDATE ||
4092  operation == CMD_DELETE);
4093 
4094  /* First, process a pending asynchronous request, if any. */
4095  if (fmstate->conn_state->pendingAreq)
4097 
4098  /*
4099  * If the existing query was deparsed and prepared for a different number
4100  * of rows, rebuild it for the proper number.
4101  */
4102  if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4103  {
4104  /* Destroy the prepared statement created previously */
4105  if (fmstate->p_name)
4106  deallocate_query(fmstate);
4107 
4108  /* Build INSERT string with numSlots records in its VALUES clause. */
4109  initStringInfo(&sql);
4110  rebuildInsertSql(&sql, fmstate->rel,
4111  fmstate->orig_query, fmstate->target_attrs,
4112  fmstate->values_end, fmstate->p_nums,
4113  *numSlots - 1);
4114  pfree(fmstate->query);
4115  fmstate->query = sql.data;
4116  fmstate->num_slots = *numSlots;
4117  }
4118 
4119  /* Set up the prepared statement on the remote server, if we didn't yet */
4120  if (!fmstate->p_name)
4121  prepare_foreign_modify(fmstate);
4122 
4123  /*
4124  * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4125  */
4126  if (operation == CMD_UPDATE || operation == CMD_DELETE)
4127  {
4128  Datum datum;
4129  bool isNull;
4130 
4131  datum = ExecGetJunkAttribute(planSlots[0],
4132  fmstate->ctidAttno,
4133  &isNull);
4134  /* shouldn't ever get a null result... */
4135  if (isNull)
4136  elog(ERROR, "ctid is NULL");
4137  ctid = (ItemPointer) DatumGetPointer(datum);
4138  }
4139 
4140  /* Convert parameters needed by prepared statement to text form */
4141  p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4142 
4143  /*
4144  * Execute the prepared statement.
4145  */
4146  if (!PQsendQueryPrepared(fmstate->conn,
4147  fmstate->p_name,
4148  fmstate->p_nums * (*numSlots),
4149  p_values,
4150  NULL,
4151  NULL,
4152  0))
4153  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4154 
4155  /*
4156  * Get the result, and check for success.
4157  *
4158  * We don't use a PG_TRY block here, so be careful not to throw error
4159  * without releasing the PGresult.
4160  */
4161  res = pgfdw_get_result(fmstate->conn, fmstate->query);
4162  if (PQresultStatus(res) !=
4164  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4165 
4166  /* Check number of rows affected, and fetch RETURNING tuple if any */
4167  if (fmstate->has_returning)
4168  {
4169  Assert(*numSlots == 1);
4170  n_rows = PQntuples(res);
4171  if (n_rows > 0)
4172  store_returning_result(fmstate, slots[0], res);
4173  }
4174  else
4175  n_rows = atoi(PQcmdTuples(res));
4176 
4177  /* And clean up */
4178  PQclear(res);
4179 
4180  MemoryContextReset(fmstate->temp_cxt);
4181 
4182  *numSlots = n_rows;
4183 
4184  /*
4185  * Return NULL if nothing was inserted/updated/deleted on the remote end
4186  */
4187  return (n_rows > 0) ? slots : NULL;
4188 }
4189 
4190 /*
4191  * prepare_foreign_modify
4192  * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4193  */
4194 static void
4196 {
4197  char prep_name[NAMEDATALEN];
4198  char *p_name;
4199  PGresult *res;
4200 
4201  /*
4202  * The caller would already have processed a pending asynchronous request
4203  * if any, so no need to do it here.
4204  */
4205 
4206  /* Construct name we'll use for the prepared statement. */
4207  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4208  GetPrepStmtNumber(fmstate->conn));
4209  p_name = pstrdup(prep_name);
4210 
4211  /*
4212  * We intentionally do not specify parameter types here, but leave the
4213  * remote server to derive them by default. This avoids possible problems
4214  * with the remote server using different type OIDs than we do. All of
4215  * the prepared statements we use in this module are simple enough that
4216  * the remote server will make the right choices.
4217  */
4218  if (!PQsendPrepare(fmstate->conn,
4219  p_name,
4220  fmstate->query,
4221  0,
4222  NULL))
4223  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4224 
4225  /*
4226  * Get the result, and check for success.
4227  *
4228  * We don't use a PG_TRY block here, so be careful not to throw error
4229  * without releasing the PGresult.
4230  */
4231  res = pgfdw_get_result(fmstate->conn, fmstate->query);
4233  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4234  PQclear(res);
4235 
4236  /* This action shows that the prepare has been done. */
4237  fmstate->p_name = p_name;
4238 }
4239 
4240 /*
4241  * convert_prep_stmt_params
4242  * Create array of text strings representing parameter values
4243  *
4244  * tupleid is ctid to send, or NULL if none
4245  * slot is slot to get remaining parameters from, or NULL if none
4246  *
4247  * Data is constructed in temp_cxt; caller should reset that after use.
4248  */
4249 static const char **
4251  ItemPointer tupleid,
4252  TupleTableSlot **slots,
4253  int numSlots)
4254 {
4255  const char **p_values;
4256  int i;
4257  int j;
4258  int pindex = 0;
4259  MemoryContext oldcontext;
4260 
4261  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4262 
4263  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4264 
4265  /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4266  Assert(!(tupleid != NULL && numSlots > 1));
4267 
4268  /* 1st parameter should be ctid, if it's in use */
4269  if (tupleid != NULL)
4270  {
4271  Assert(numSlots == 1);
4272  /* don't need set_transmission_modes for TID output */
4273  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4274  PointerGetDatum(tupleid));
4275  pindex++;
4276  }
4277 
4278  /* get following parameters from slots */
4279  if (slots != NULL && fmstate->target_attrs != NIL)
4280  {
4281  TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4282  int nestlevel;
4283  ListCell *lc;
4284 
4285  nestlevel = set_transmission_modes();
4286 
4287  for (i = 0; i < numSlots; i++)
4288  {
4289  j = (tupleid != NULL) ? 1 : 0;
4290  foreach(lc, fmstate->target_attrs)
4291  {
4292  int attnum = lfirst_int(lc);
4293  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4294  Datum value;
4295  bool isnull;
4296 
4297  /* Ignore generated columns; they are set to DEFAULT */
4298  if (attr->attgenerated)
4299  continue;
4300  value = slot_getattr(slots[i], attnum, &isnull);
4301  if (isnull)
4302  p_values[pindex] = NULL;
4303  else
4304  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4305  value);
4306  pindex++;
4307  j++;
4308  }
4309  }
4310 
4311  reset_transmission_modes(nestlevel);
4312  }
4313 
4314  Assert(pindex == fmstate->p_nums * numSlots);
4315 
4316  MemoryContextSwitchTo(oldcontext);
4317 
4318  return p_values;
4319 }
4320 
4321 /*
4322  * store_returning_result
4323  * Store the result of a RETURNING clause
4324  *
4325  * On error, be sure to release the PGresult on the way out. Callers do not
4326  * have PG_TRY blocks to ensure this happens.
4327  */
4328 static void
4330  TupleTableSlot *slot, PGresult *res)
4331 {
4332  PG_TRY();
4333  {
4334  HeapTuple newtup;
4335 
4336  newtup = make_tuple_from_result_row(res, 0,
4337  fmstate->rel,
4338  fmstate->attinmeta,
4339  fmstate->retrieved_attrs,
4340  NULL,
4341  fmstate->temp_cxt);
4342 
4343  /*
4344  * The returning slot will not necessarily be suitable to store
4345  * heaptuples directly, so allow for conversion.
4346  */
4347  ExecForceStoreHeapTuple(newtup, slot, true);
4348  }
4349  PG_CATCH();
4350  {
4351  PQclear(res);
4352  PG_RE_THROW();
4353  }
4354  PG_END_TRY();
4355 }
4356 
4357 /*
4358  * finish_foreign_modify
4359  * Release resources for a foreign insert/update/delete operation
4360  */
4361 static void
4363 {
4364  Assert(fmstate != NULL);
4365 
4366  /* If we created a prepared statement, destroy it */
4367  deallocate_query(fmstate);
4368 
4369  /* Release remote connection */
4370  ReleaseConnection(fmstate->conn);
4371  fmstate->conn = NULL;
4372 }
4373 
4374 /*
4375  * deallocate_query
4376  * Deallocate a prepared statement for a foreign insert/update/delete
4377  * operation
4378  */
4379 static void
4381 {
4382  char sql[64];
4383  PGresult *res;
4384 
4385  /* do nothing if the query is not allocated */
4386  if (!fmstate->p_name)
4387  return;
4388 
4389  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4390 
4391  /*
4392  * We don't use a PG_TRY block here, so be careful not to throw error
4393  * without releasing the PGresult.
4394  */
4395  res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4397  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4398  PQclear(res);
4399  pfree(fmstate->p_name);
4400  fmstate->p_name = NULL;
4401 }
4402 
4403 /*
4404  * build_remote_returning
4405  * Build a RETURNING targetlist of a remote query for performing an
4406  * UPDATE/DELETE .. RETURNING on a join directly
4407  */
4408 static List *
4409 build_remote_returning(Index rtindex, Relation rel, List *returningList)
4410 {
4411  bool have_wholerow = false;
4412  List *tlist = NIL;
4413  List *vars;
4414  ListCell *lc;
4415 
4416  Assert(returningList);
4417 
4418  vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4419 
4420  /*
4421  * If there's a whole-row reference to the target relation, then we'll
4422  * need all the columns of the relation.
4423  */
4424  foreach(lc, vars)
4425  {
4426  Var *var = (Var *) lfirst(lc);
4427 
4428  if (IsA(var, Var) &&
4429  var->varno == rtindex &&
4430  var->varattno == InvalidAttrNumber)
4431  {
4432  have_wholerow = true;
4433  break;
4434  }
4435  }
4436 
4437  if (have_wholerow)
4438  {
4439  TupleDesc tupdesc = RelationGetDescr(rel);
4440  int i;
4441 
4442  for (i = 1; i <= tupdesc->natts; i++)
4443  {
4444  Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4445  Var *var;
4446 
4447  /* Ignore dropped attributes. */
4448  if (attr->attisdropped)
4449  continue;
4450 
4451  var = makeVar(rtindex,
4452  i,
4453  attr->atttypid,
4454  attr->atttypmod,
4455  attr->attcollation,
4456  0);
4457 
4458  tlist = lappend(tlist,
4459  makeTargetEntry((Expr *) var,
4460  list_length(tlist) + 1,
4461  NULL,
4462  false));
4463  }
4464  }
4465 
4466  /* Now add any remaining columns to tlist. */
4467  foreach(lc, vars)
4468  {
4469  Var *var = (Var *) lfirst(lc);
4470 
4471  /*
4472  * No need for whole-row references to the target relation. We don't
4473  * need system columns other than ctid and oid either, since those are
4474  * set locally.
4475  */
4476  if (IsA(var, Var) &&
4477  var->varno == rtindex &&
4478  var->varattno <= InvalidAttrNumber &&
4480  continue; /* don't need it */
4481 
4482  if (tlist_member((Expr *) var, tlist))
4483  continue; /* already got it */
4484 
4485  tlist = lappend(tlist,
4486  makeTargetEntry((Expr *) var,
4487  list_length(tlist) + 1,
4488  NULL,
4489  false));
4490  }
4491 
4492  list_free(vars);
4493 
4494  return tlist;
4495 }
4496 
4497 /*
4498  * rebuild_fdw_scan_tlist
4499  * Build new fdw_scan_tlist of given foreign-scan plan node from given
4500  * tlist
4501  *
4502  * There might be columns that the fdw_scan_tlist of the given foreign-scan
4503  * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4504  * have contained resjunk columns such as 'ctid' of the target relation and
4505  * 'wholerow' of non-target relations, but the tlist might not contain them,
4506  * for example. So, adjust the tlist so it contains all the columns specified
4507  * in the fdw_scan_tlist; else setrefs.c will get confused.
4508  */
4509 static void
4511 {
4512  List *new_tlist = tlist;
4513  List *old_tlist = fscan->fdw_scan_tlist;
4514  ListCell *lc;
4515 
4516  foreach(lc, old_tlist)
4517  {
4518  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4519 
4520  if (tlist_member(tle->expr, new_tlist))
4521  continue; /* already got it */
4522 
4523  new_tlist = lappend(new_tlist,
4524  makeTargetEntry(tle->expr,
4525  list_length(new_tlist) + 1,
4526  NULL,
4527  false));
4528  }
4529  fscan->fdw_scan_tlist = new_tlist;
4530 }
4531 
4532 /*
4533  * Execute a direct UPDATE/DELETE statement.
4534  */
4535 static void
4537 {
4539  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4540  int numParams = dmstate->numParams;
4541  const char **values = dmstate->param_values;
4542 
4543  /* First, process a pending asynchronous request, if any. */
4544  if (dmstate->conn_state->pendingAreq)
4546 
4547  /*
4548  * Construct array of query parameter values in text format.
4549  */
4550  if (numParams > 0)
4551  process_query_params(econtext,
4552  dmstate->param_flinfo,
4553  dmstate->param_exprs,
4554  values);
4555 
4556  /*
4557  * Notice that we pass NULL for paramTypes, thus forcing the remote server
4558  * to infer types for all parameters. Since we explicitly cast every
4559  * parameter (see deparse.c), the "inference" is trivial and will produce
4560  * the desired result. This allows us to avoid assuming that the remote
4561  * server has the same OIDs we do for the parameters' types.
4562  */
4563  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4564  NULL, values, NULL, NULL, 0))
4565  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4566 
4567  /*
4568  * Get the result, and check for success.
4569  *
4570  * We don't use a PG_TRY block here, so be careful not to throw error
4571  * without releasing the PGresult.
4572  */
4573  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4574  if (PQresultStatus(dmstate->result) !=
4576  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4577  dmstate->query);
4578 
4579  /* Get the number of rows affected. */
4580  if (dmstate->has_returning)
4581  dmstate->num_tuples = PQntuples(dmstate->result);
4582  else
4583  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4584 }
4585 
4586 /*
4587  * Get the result of a RETURNING clause.
4588  */
4589 static TupleTableSlot *
4591 {
4593  EState *estate = node->ss.ps.state;
4594  ResultRelInfo *resultRelInfo = node->resultRelInfo;
4595  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4596  TupleTableSlot *resultSlot;
4597 
4598  Assert(resultRelInfo->ri_projectReturning);
4599 
4600  /* If we didn't get any tuples, must be end of data. */
4601  if (dmstate->next_tuple >= dmstate->num_tuples)
4602  return ExecClearTuple(slot);
4603 
4604  /* Increment the command es_processed count if necessary. */
4605  if (dmstate->set_processed)
4606  estate->es_processed += 1;
4607 
4608  /*
4609  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4610  * tuple. (has_returning is false when the local query is of the form
4611  * "UPDATE/DELETE .. RETURNING 1" for example.)
4612  */
4613  if (!dmstate->has_returning)
4614  {
4615  ExecStoreAllNullTuple(slot);
4616  resultSlot = slot;
4617  }
4618  else
4619  {
4620  /*
4621  * On error, be sure to release the PGresult on the way out. Callers
4622  * do not have PG_TRY blocks to ensure this happens.
4623  */
4624  PG_TRY();
4625  {
4626  HeapTuple newtup;
4627 
4628  newtup = make_tuple_from_result_row(dmstate->result,
4629  dmstate->next_tuple,
4630  dmstate->rel,
4631  dmstate->attinmeta,
4632  dmstate->retrieved_attrs,
4633  node,
4634  dmstate->temp_cxt);
4635  ExecStoreHeapTuple(newtup, slot, false);
4636  }
4637  PG_CATCH();
4638  {
4639  PQclear(dmstate->result);
4640  PG_RE_THROW();
4641  }
4642  PG_END_TRY();
4643 
4644  /* Get the updated/deleted tuple. */
4645  if (dmstate->rel)
4646  resultSlot = slot;
4647  else
4648  resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4649  }
4650  dmstate->next_tuple++;
4651 
4652  /* Make slot available for evaluation of the local query RETURNING list. */
4653  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4654  resultSlot;
4655 
4656  return slot;
4657 }
4658 
4659 /*
4660  * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4661  */
4662 static void
4664  List *fdw_scan_tlist,
4665  Index rtindex)
4666 {
4667  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4668  ListCell *lc;
4669  int i;
4670 
4671  /*
4672  * Calculate the mapping between the fdw_scan_tlist's entries and the
4673  * result tuple's attributes.
4674  *
4675  * The "map" is an array of indexes of the result tuple's attributes in
4676  * fdw_scan_tlist, i.e., one entry for every attribute of the result
4677  * tuple. We store zero for any attributes that don't have the
4678  * corresponding entries in that list, marking that a NULL is needed in
4679  * the result tuple.
4680  *
4681  * Also get the indexes of the entries for ctid and oid if any.
4682  */
4683  dmstate->attnoMap = (AttrNumber *)
4684  palloc0(resultTupType->natts * sizeof(AttrNumber));
4685 
4686  dmstate->ctidAttno = dmstate->oidAttno = 0;
4687 
4688  i = 1;
4689  dmstate->hasSystemCols = false;
4690  foreach(lc, fdw_scan_tlist)
4691  {
4692  TargetEntry *tle = (TargetEntry *) lfirst(lc);
4693  Var *var = (Var *) tle->expr;
4694 
4695  Assert(IsA(var, Var));
4696 
4697  /*
4698  * If the Var is a column of the target relation to be retrieved from
4699  * the foreign server, get the index of the entry.
4700  */
4701  if (var->varno == rtindex &&
4702  list_member_int(dmstate->retrieved_attrs, i))
4703  {
4704  int attrno = var->varattno;
4705 
4706  if (attrno < 0)
4707  {
4708  /*
4709  * We don't retrieve system columns other than ctid and oid.
4710  */
4711  if (attrno == SelfItemPointerAttributeNumber)
4712  dmstate->ctidAttno = i;
4713  else
4714  Assert(false);
4715  dmstate->hasSystemCols = true;
4716  }
4717  else
4718  {
4719  /*
4720  * We don't retrieve whole-row references to the target
4721  * relation either.
4722  */
4723  Assert(attrno > 0);
4724 
4725  dmstate->attnoMap[attrno - 1] = i;
4726  }
4727  }
4728  i++;
4729  }
4730 }
4731 
4732 /*
4733  * Extract and return an updated/deleted tuple from a scan tuple.
4734  */
4735 static TupleTableSlot *
4737  ResultRelInfo *resultRelInfo,
4738  TupleTableSlot *slot,
4739  EState *estate)
4740 {
4741  TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4742  TupleTableSlot *resultSlot;
4743  Datum *values;
4744  bool *isnull;
4745  Datum *old_values;
4746  bool *old_isnull;
4747  int i;
4748 
4749  /*
4750  * Use the return tuple slot as a place to store the result tuple.
4751  */
4752  resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4753 
4754  /*
4755  * Extract all the values of the scan tuple.
4756  */
4757  slot_getallattrs(slot);
4758  old_values = slot->tts_values;
4759  old_isnull = slot->tts_isnull;
4760 
4761  /*
4762  * Prepare to build the result tuple.
4763  */
4764  ExecClearTuple(resultSlot);
4765  values = resultSlot->tts_values;
4766  isnull = resultSlot->tts_isnull;
4767 
4768  /*
4769  * Transpose data into proper fields of the result tuple.
4770  */
4771  for (i = 0; i < resultTupType->natts; i++)
4772  {
4773  int j = dmstate->attnoMap[i];
4774 
4775  if (j == 0)
4776  {
4777  values[i] = (Datum) 0;
4778  isnull[i] = true;
4779  }
4780  else
4781  {
4782  values[i] = old_values[j - 1];
4783  isnull[i] = old_isnull[j - 1];
4784  }
4785  }
4786 
4787  /*
4788  * Build the virtual tuple.
4789  */
4790  ExecStoreVirtualTuple(resultSlot);
4791 
4792  /*
4793  * If we have any system columns to return, materialize a heap tuple in
4794  * the slot from column values set above and install system columns in
4795  * that tuple.
4796  */
4797  if (dmstate->hasSystemCols)
4798  {
4799  HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4800 
4801  /* ctid */
4802  if (dmstate->ctidAttno)
4803  {
4804  ItemPointer ctid = NULL;
4805 
4806  ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4807  resultTup->t_self = *ctid;
4808  }
4809 
4810  /*
4811  * And remaining columns
4812  *
4813  * Note: since we currently don't allow the target relation to appear
4814  * on the nullable side of an outer join, any system columns wouldn't
4815  * go to NULL.
4816  *
4817  * Note: no need to care about tableoid here because it will be
4818  * initialized in ExecProcessReturning().
4819  */
4823  }
4824 
4825  /*
4826  * And return the result tuple.
4827  */
4828  return resultSlot;
4829 }
4830 
4831 /*
4832  * Prepare for processing of parameters used in remote query.
4833  */
4834 static void
4836  List *fdw_exprs,
4837  int numParams,
4838  FmgrInfo **param_flinfo,
4839  List **param_exprs,
4840  const char ***param_values)
4841 {
4842  int i;
4843  ListCell *lc;
4844 
4845  Assert(numParams > 0);
4846 
4847  /* Prepare for output conversion of parameters used in remote query. */
4848  *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4849 
4850  i = 0;
4851  foreach(lc, fdw_exprs)
4852  {
4853  Node *param_expr = (Node *) lfirst(lc);
4854  Oid typefnoid;
4855  bool isvarlena;
4856 
4857  getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4858  fmgr_info(typefnoid, &(*param_flinfo)[i]);
4859  i++;
4860  }
4861 
4862  /*
4863  * Prepare remote-parameter expressions for evaluation. (Note: in
4864  * practice, we expect that all these expressions will be just Params, so
4865  * we could possibly do something more efficient than using the full
4866  * expression-eval machinery for this. But probably there would be little
4867  * benefit, and it'd require postgres_fdw to know more than is desirable
4868  * about Param evaluation.)
4869  */
4870  *param_exprs = ExecInitExprList(fdw_exprs, node);
4871 
4872  /* Allocate buffer for text form of query parameters. */
4873  *param_values = (const char **) palloc0(numParams * sizeof(char *));
4874 }
4875 
4876 /*
4877  * Construct array of query parameter values in text format.
4878  */
4879 static void
4881  FmgrInfo *param_flinfo,
4882  List *param_exprs,
4883  const char **param_values)
4884 {
4885  int nestlevel;
4886  int i;
4887  ListCell *lc;
4888 
4889  nestlevel = set_transmission_modes();
4890 
4891  i = 0;
4892  foreach(lc, param_exprs)
4893  {
4894  ExprState *expr_state = (ExprState *) lfirst(lc);
4895  Datum expr_value;
4896  bool isNull;
4897 
4898  /* Evaluate the parameter expression */
4899  expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4900 
4901  /*
4902  * Get string representation of each parameter value by invoking
4903  * type-specific output function, unless the value is null.
4904  */
4905  if (isNull)
4906  param_values[i] = NULL;
4907  else
4908  param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4909 
4910  i++;
4911  }
4912 
4913  reset_transmission_modes(nestlevel);
4914 }
4915 
4916 /*
4917  * postgresAnalyzeForeignTable
4918  * Test whether analyzing this foreign table is supported
4919  */
4920 static bool
4922  AcquireSampleRowsFunc *func,
4923  BlockNumber *totalpages)
4924 {
4925  ForeignTable *table;
4926  UserMapping *user;
4927  PGconn *conn;
4928  StringInfoData sql;
4929  PGresult *volatile res = NULL;
4930 
4931  /* Return the row-analysis function pointer */
4933 
4934  /*
4935  * Now we have to get the number of pages. It's annoying that the ANALYZE
4936  * API requires us to return that now, because it forces some duplication
4937  * of effort between this routine and postgresAcquireSampleRowsFunc. But
4938  * it's probably not worth redefining that API at this point.
4939  */
4940 
4941  /*
4942  * Get the connection to use. We do the remote access as the table's
4943  * owner, even if the ANALYZE was started by some other user.
4944  */
4945  table = GetForeignTable(RelationGetRelid(relation));
4946  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4947  conn = GetConnection(user, false, NULL);
4948 
4949  /*
4950  * Construct command to get page count for relation.
4951  */
4952  initStringInfo(&sql);
4953  deparseAnalyzeSizeSql(&sql, relation);
4954 
4955  /* In what follows, do not risk leaking any PGresults. */
4956  PG_TRY();
4957  {
4958  res = pgfdw_exec_query(conn, sql.data, NULL);
4960  pgfdw_report_error(ERROR, res, conn, false, sql.data);
4961 
4962  if (PQntuples(res) != 1 || PQnfields(res) != 1)
4963  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4964  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4965  }
4966  PG_FINALLY();
4967  {
4968  PQclear(res);
4969  }
4970  PG_END_TRY();
4971 
4973 
4974  return true;
4975 }
4976 
4977 /*
4978  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
4979  *
4980  * We fetch the whole table from the remote side and pick out some sample rows.
4981  *
4982  * Selected rows are returned in the caller-allocated array rows[],
4983  * which must have at least targrows entries.
4984  * The actual number of rows selected is returned as the function result.
4985  * We also count the total number of rows in the table and return it into
4986  * *totalrows. Note that *totaldeadrows is always set to 0.
4987  *
4988  * Note that the returned list of rows is not always in order by physical
4989  * position in the table. Therefore, correlation estimates derived later
4990  * may be meaningless, but it's OK because we don't use the estimates
4991  * currently (the planner only pays attention to correlation for indexscans).
4992  */
4993 static int
4995  HeapTuple *rows, int targrows,
4996  double *totalrows,
4997  double *totaldeadrows)
4998 {
4999  PgFdwAnalyzeState astate;
5000  ForeignTable *table;
5001  ForeignServer *server;
5002  UserMapping *user;
5003  PGconn *conn;
5004  unsigned int cursor_number;
5005  StringInfoData sql;
5006  PGresult *volatile res = NULL;
5007 
5008  /* Initialize workspace state */
5009  astate.rel = relation;
5011 
5012  astate.rows = rows;
5013  astate.targrows = targrows;
5014  astate.numrows = 0;
5015  astate.samplerows = 0;
5016  astate.rowstoskip = -1; /* -1 means not set yet */
5017  reservoir_init_selection_state(&astate.rstate, targrows);
5018 
5019  /* Remember ANALYZE context, and create a per-tuple temp context */
5020  astate.anl_cxt = CurrentMemoryContext;
5022  "postgres_fdw temporary data",
5024 
5025  /*
5026  * Get the connection to use. We do the remote access as the table's
5027  * owner, even if the ANALYZE was started by some other user.
5028  */
5029  table = GetForeignTable(RelationGetRelid(relation));
5030  server = GetForeignServer(table->serverid);
5031  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5032  conn = GetConnection(user, false, NULL);
5033 
5034  /*
5035  * Construct cursor that retrieves whole rows from remote.
5036  */
5038  initStringInfo(&sql);
5039  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5040  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
5041 
5042  /* In what follows, do not risk leaking any PGresults. */
5043  PG_TRY();
5044  {
5045  char fetch_sql[64];
5046  int fetch_size;
5047  ListCell *lc;
5048 
5049  res = pgfdw_exec_query(conn, sql.data, NULL);
5051  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5052  PQclear(res);
5053  res = NULL;
5054 
5055  /*
5056  * Determine the fetch size. The default is arbitrary, but shouldn't
5057  * be enormous.
5058  */
5059  fetch_size = 100;
5060  foreach(lc, server->options)
5061  {
5062  DefElem *def = (DefElem *) lfirst(lc);
5063 
5064  if (strcmp(def->defname, "fetch_size") == 0)
5065  {
5066  (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5067  break;
5068  }
5069  }
5070  foreach(lc, table->options)
5071  {
5072  DefElem *def = (DefElem *) lfirst(lc);
5073 
5074  if (strcmp(def->defname, "fetch_size") == 0)
5075  {
5076  (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5077  break;
5078  }
5079  }
5080 
5081  /* Construct command to fetch rows from remote. */
5082  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5084 
5085  /* Retrieve and process rows a batch at a time. */
5086  for (;;)
5087  {
5088  int numrows;
5089  int i;
5090 
5091  /* Allow users to cancel long query */
5093 
5094  /*
5095  * XXX possible future improvement: if rowstoskip is large, we
5096  * could issue a MOVE rather than physically fetching the rows,
5097  * then just adjust rowstoskip and samplerows appropriately.
5098  */
5099 
5100  /* Fetch some rows */
5101  res = pgfdw_exec_query(conn, fetch_sql, NULL);
5102  /* On error, report the original query, not the FETCH. */
5104  pgfdw_report_error(ERROR, res, conn, false, sql.data);
5105 
5106  /* Process whatever we got. */
5107  numrows = PQntuples(res);
5108  for (i = 0; i < numrows; i++)
5109  analyze_row_processor(res, i, &astate);
5110 
5111  PQclear(res);
5112  res = NULL;
5113 
5114  /* Must be EOF if we didn't get all the rows requested. */
5115  if (numrows < fetch_size)
5116  break;
5117  }
5118 
5119  /* Close the cursor, just to be tidy. */
5121  }
5122  PG_CATCH();
5123  {
5124  PQclear(res);
5125  PG_RE_THROW();
5126  }
5127  PG_END_TRY();
5128 
5130 
5131  /* We assume that we have no dead tuple. */
5132  *totaldeadrows = 0.0;
5133 
5134  /* We've retrieved all living tuples from foreign server. */
5135  *totalrows = astate.samplerows;
5136 
5137  /*
5138  * Emit some interesting relation info
5139  */
5140  ereport(elevel,
5141  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5142  RelationGetRelationName(relation),
5143  astate.samplerows, astate.numrows)));
5144 
5145  return astate.numrows;
5146 }
5147 
5148 /*
5149  * Collect sample rows from the result of query.
5150  * - Use all tuples in sample until target # of samples are collected.
5151  * - Subsequently, replace already-sampled tuples randomly.
5152  */
5153 static void
5155 {
5156  int targrows = astate->targrows;
5157  int pos; /* array index to store tuple in */
5158  MemoryContext oldcontext;
5159 
5160  /* Always increment sample row counter. */
5161  astate->samplerows += 1;
5162 
5163  /*
5164  * Determine the slot where this sample row should be stored. Set pos to
5165  * negative value to indicate the row should be skipped.
5166  */
5167  if (astate->numrows < targrows)
5168  {
5169  /* First targrows rows are always included into the sample */
5170  pos = astate->numrows++;
5171  }
5172  else
5173  {
5174  /*
5175  * Now we start replacing tuples in the sample until we reach the end
5176  * of the relation. Same algorithm as in acquire_sample_rows in
5177  * analyze.c; see Jeff Vitter's paper.
5178  */
5179  if (astate->rowstoskip < 0)
5180  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5181 
5182  if (astate->rowstoskip <= 0)
5183  {
5184  /* Choose a random reservoir element to replace. */
5185  pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate));
5186  Assert(pos >= 0 && pos < targrows);
5187  heap_freetuple(astate->rows[pos]);
5188  }
5189  else
5190  {
5191  /* Skip this tuple. */
5192  pos = -1;
5193  }
5194 
5195  astate->rowstoskip -= 1;
5196  }
5197 
5198  if (pos >= 0)
5199  {
5200  /*
5201  * Create sample tuple from current result row, and store it in the
5202  * position determined above. The tuple has to be created in anl_cxt.
5203  */
5204  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5205 
5206  astate->rows[pos] = make_tuple_from_result_row(res, row,
5207  astate->rel,
5208  astate->attinmeta,
5209  astate->retrieved_attrs,
5210  NULL,
5211  astate->temp_cxt);
5212 
5213  MemoryContextSwitchTo(oldcontext);
5214  }
5215 }
5216 
5217 /*
5218  * Import a foreign schema
5219  */
5220 static List *
5222 {
5223  List *commands = NIL;
5224  bool import_collate = true;
5225  bool import_default = false;
5226  bool import_generated = true;
5227  bool import_not_null = true;
5228  ForeignServer *server;
5229  UserMapping *mapping;
5230  PGconn *conn;
5232  PGresult *volatile res = NULL;
5233  int numrows,
5234  i;
5235  ListCell *lc;
5236 
5237  /* Parse statement options */
5238  foreach(lc, stmt->options)
5239  {
5240  DefElem *def = (DefElem *) lfirst(lc);
5241 
5242  if (strcmp(def->defname, "import_collate") == 0)
5243  import_collate = defGetBoolean(def);
5244  else if (strcmp(def->defname, "import_default") == 0)
5245  import_default = defGetBoolean(def);
5246  else if (strcmp(def->defname, "import_generated") == 0)
5247  import_generated = defGetBoolean(def);
5248  else if (strcmp(def->defname, "import_not_null") == 0)
5249  import_not_null = defGetBoolean(def);
5250  else
5251  ereport(ERROR,
5252  (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5253  errmsg("invalid option \"%s\"", def->defname)));
5254  }
5255 
5256  /*
5257  * Get connection to the foreign server. Connection manager will
5258  * establish new connection if necessary.
5259  */
5260  server = GetForeignServer(serverOid);
5261  mapping = GetUserMapping(GetUserId(), server->serverid);
5262  conn = GetConnection(mapping, false, NULL);
5263 
5264  /* Don't attempt to import collation if remote server hasn't got it */
5265  if (PQserverVersion(conn) < 90100)
5266  import_collate = false;
5267 
5268  /* Create workspace for strings */
5269  initStringInfo(&buf);
5270 
5271  /* In what follows, do not risk leaking any PGresults. */
5272  PG_TRY();
5273  {
5274  /* Check that the schema really exists */
5275  appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5277 
5278  res = pgfdw_exec_query(conn, buf.data, NULL);
5280  pgfdw_report_error(ERROR, res, conn, false, buf.data);
5281 
5282  if (PQntuples(res) != 1)
5283  ereport(ERROR,
5284  (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5285  errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5286  stmt->remote_schema, server->servername)));
5287 
5288  PQclear(res);
5289  res = NULL;
5290  resetStringInfo(&buf);
5291 
5292  /*
5293  * Fetch all table data from this schema, possibly restricted by
5294  * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5295  * to EXCEPT/LIMIT TO here, because the core code will filter the
5296  * statements we return according to those lists anyway. But it
5297  * should save a few cycles to not process excluded tables in the
5298  * first place.)
5299  *
5300  * Import table data for partitions only when they are explicitly
5301  * specified in LIMIT TO clause. Otherwise ignore them and only
5302  * include the definitions of the root partitioned tables to allow
5303  * access to the complete remote data set locally in the schema
5304  * imported.
5305  *
5306  * Note: because we run the connection with search_path restricted to
5307  * pg_catalog, the format_type() and pg_get_expr() outputs will always
5308  * include a schema name for types/functions in other schemas, which
5309  * is what we want.
5310  */
5312  "SELECT relname, "
5313  " attname, "
5314  " format_type(atttypid, atttypmod), "
5315  " attnotnull, "
5316  " pg_get_expr(adbin, adrelid), ");
5317 
5318  /* Generated columns are supported since Postgres 12 */
5319  if (PQserverVersion(conn) >= 120000)
5321  " attgenerated, ");
5322  else
5324  " NULL, ");
5325 
5326  if (import_collate)
5328  " collname, "
5329  " collnsp.nspname ");
5330  else
5332  " NULL, NULL ");
5333 
5335  "FROM pg_class c "
5336  " JOIN pg_namespace n ON "
5337  " relnamespace = n.oid "
5338  " LEFT JOIN pg_attribute a ON "
5339  " attrelid = c.oid AND attnum > 0 "
5340  " AND NOT attisdropped "
5341  " LEFT JOIN pg_attrdef ad ON "
5342  " adrelid = c.oid AND adnum = attnum ");
5343 
5344  if (import_collate)
5346  " LEFT JOIN pg_collation coll ON "
5347  " coll.oid = attcollation "
5348  " LEFT JOIN pg_namespace collnsp ON "
5349  " collnsp.oid = collnamespace ");
5350 
5352  "WHERE c.relkind IN ("
5353  CppAsString2(RELKIND_RELATION) ","
5354  CppAsString2(RELKIND_VIEW) ","
5355  CppAsString2(RELKIND_FOREIGN_TABLE) ","
5356  CppAsString2(RELKIND_MATVIEW) ","
5357  CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
5358  " AND n.nspname = ");
5360 
5361  /* Partitions are supported since Postgres 10 */
5362  if (PQserverVersion(conn) >= 100000 &&
5364  appendStringInfoString(&buf, " AND NOT c.relispartition ");
5365 
5366  /* Apply restrictions for LIMIT TO and EXCEPT */
5367  if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
5369  {
5370  bool first_item = true;
5371 
5372  appendStringInfoString(&buf, " AND c.relname ");
5373  if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5374  appendStringInfoString(&buf, "NOT ");
5375  appendStringInfoString(&buf, "IN (");
5376 
5377  /* Append list of table names within IN clause */
5378  foreach(lc, stmt->table_list)
5379  {
5380  RangeVar *rv = (RangeVar *) lfirst(lc);
5381 
5382  if (first_item)
5383  first_item = false;
5384  else
5385  appendStringInfoString(&buf, ", ");
5387  }
5388  appendStringInfoChar(&buf, ')');
5389  }
5390 
5391  /* Append ORDER BY at the end of query to ensure output ordering */
5392  appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
5393 
5394  /* Fetch the data */
5395  res = pgfdw_exec_query(conn, buf.data, NULL);
5397  pgfdw_report_error(ERROR, res, conn, false, buf.data);
5398 
5399  /* Process results */
5400  numrows = PQntuples(res);
5401  /* note: incrementation of i happens in inner loop's while() test */
5402  for (i = 0; i < numrows;)
5403  {
5404  char *tablename = PQgetvalue(res, i, 0);
5405  bool first_item = true;
5406 
5407  resetStringInfo(&buf);
5408  appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
5409  quote_identifier(tablename));
5410 
5411  /* Scan all rows for this table */
5412  do
5413  {
5414  char *attname;
5415  char *typename;
5416  char *attnotnull;
5417  char *attgenerated;
5418  char *attdefault;
5419  char *collname;
5420  char *collnamespace;
5421 
5422  /* If table has no columns, we'll see nulls here */
5423  if (PQgetisnull(res, i, 1))
5424  continue;
5425 
5426  attname = PQgetvalue(res, i, 1);
5427  typename = PQgetvalue(res, i, 2);
5428  attnotnull = PQgetvalue(res, i, 3);
5429  attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
5430  PQgetvalue(res, i, 4);
5431  attgenerated = PQgetisnull(res, i, 5) ? (char *) NULL :
5432  PQgetvalue(res, i, 5);
5433  collname = PQgetisnull(res, i, 6) ? (char *) NULL :
5434  PQgetvalue(res, i, 6);
5435  collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
5436  PQgetvalue(res, i, 7);
5437 
5438  if (first_item)
5439  first_item = false;
5440  else
5441  appendStringInfoString(&buf, ",\n");
5442 
5443  /* Print column name and type */
5444  appendStringInfo(&buf, " %s %s",
5446  typename);
5447 
5448  /*
5449  * Add column_name option so that renaming the foreign table's
5450  * column doesn't break the association to the underlying
5451  * column.
5452  */
5453  appendStringInfoString(&buf, " OPTIONS (column_name ");
5455  appendStringInfoChar(&buf, ')');
5456 
5457  /* Add COLLATE if needed */
5458  if (import_collate && collname != NULL && collnamespace != NULL)
5459  appendStringInfo(&buf, " COLLATE %s.%s",
5460  quote_identifier(collnamespace),
5461  quote_identifier(collname));
5462 
5463  /* Add DEFAULT if needed */
5464  if (import_default && attdefault != NULL &&
5465  (!attgenerated || !attgenerated[0]))
5466  appendStringInfo(&buf, " DEFAULT %s", attdefault);
5467 
5468  /* Add GENERATED if needed */
5469  if (import_generated && attgenerated != NULL &&
5470  attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
5471  {
5472  Assert(attdefault != NULL);
5474  " GENERATED ALWAYS AS (%s) STORED",
5475  attdefault);
5476  }
5477 
5478  /* Add NOT NULL if needed */
5479  if (import_not_null && attnotnull[0] == 't')
5480  appendStringInfoString(&buf, " NOT NULL");
5481  }
5482  while (++i < numrows &&
5483  strcmp(PQgetvalue(res, i, 0), tablename) == 0);
5484 
5485  /*
5486  * Add server name and table-level options. We specify remote
5487  * schema and table name as options (the latter to ensure that
5488  * renaming the foreign table doesn't break the association).
5489  */
5490  appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5491  quote_identifier(server->servername));
5492 
5493  appendStringInfoString(&buf, "schema_name ");
5495  appendStringInfoString(&buf, ", table_name ");
5496  deparseStringLiteral(&buf, tablename);
5497 
5498  appendStringInfoString(&buf, ");");
5499 
5500  commands = lappend(commands,