PostgreSQL Source Code git master
postgres_fdw.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
5 *
6 * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include <limits.h>
16
17#include "access/htup_details.h"
18#include "access/sysattr.h"
19#include "access/table.h"
20#include "catalog/pg_opfamily.h"
21#include "commands/defrem.h"
22#include "commands/explain.h"
23#include "executor/execAsync.h"
24#include "foreign/fdwapi.h"
25#include "funcapi.h"
26#include "miscadmin.h"
27#include "nodes/makefuncs.h"
28#include "nodes/nodeFuncs.h"
30#include "optimizer/cost.h"
31#include "optimizer/inherit.h"
32#include "optimizer/optimizer.h"
33#include "optimizer/pathnode.h"
34#include "optimizer/paths.h"
35#include "optimizer/planmain.h"
36#include "optimizer/prep.h"
38#include "optimizer/tlist.h"
39#include "parser/parsetree.h"
40#include "postgres_fdw.h"
41#include "storage/latch.h"
42#include "utils/builtins.h"
43#include "utils/float.h"
44#include "utils/guc.h"
45#include "utils/lsyscache.h"
46#include "utils/memutils.h"
47#include "utils/rel.h"
48#include "utils/sampling.h"
49#include "utils/selfuncs.h"
50
52
53/* Default CPU cost to start up a foreign query. */
54#define DEFAULT_FDW_STARTUP_COST 100.0
55
56/* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
57#define DEFAULT_FDW_TUPLE_COST 0.2
58
59/* If no remote estimates, assume a sort costs 20% extra */
60#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
61
62/*
63 * Indexes of FDW-private information stored in fdw_private lists.
64 *
65 * These items are indexed with the enum FdwScanPrivateIndex, so an item
66 * can be fetched with list_nth(). For example, to get the SELECT statement:
67 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
68 */
70{
71 /* SQL statement to execute remotely (as a String node) */
73 /* Integer list of attribute numbers retrieved by the SELECT */
75 /* Integer representing the desired fetch_size */
77
78 /*
79 * String describing join i.e. names of relations being joined and types
80 * of join, added when the scan is join
81 */
83};
84
85/*
86 * Similarly, this enum describes what's kept in the fdw_private list for
87 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
88 *
89 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
90 * 2) Integer list of target attribute numbers for INSERT/UPDATE
91 * (NIL for a DELETE)
92 * 3) Length till the end of VALUES clause for INSERT
93 * (-1 for a DELETE/UPDATE)
94 * 4) Boolean flag showing if the remote query has a RETURNING clause
95 * 5) Integer list of attribute numbers retrieved by RETURNING, if any
96 */
98{
99 /* SQL statement to execute remotely (as a String node) */
101 /* Integer list of target attribute numbers for INSERT/UPDATE */
103 /* Length till the end of VALUES clause (as an Integer node) */
105 /* has-returning flag (as a Boolean node) */
107 /* Integer list of attribute numbers retrieved by RETURNING */
109};
110
111/*
112 * Similarly, this enum describes what's kept in the fdw_private list for
113 * a ForeignScan node that modifies a foreign table directly. We store:
114 *
115 * 1) UPDATE/DELETE statement text to be sent to the remote server
116 * 2) Boolean flag showing if the remote query has a RETURNING clause
117 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
118 * 4) Boolean flag showing if we set the command es_processed
119 */
121{
122 /* SQL statement to execute remotely (as a String node) */
124 /* has-returning flag (as a Boolean node) */
126 /* Integer list of attribute numbers retrieved by RETURNING */
128 /* set-processed flag (as a Boolean node) */
130};
131
132/*
133 * Execution state of a foreign scan using postgres_fdw.
134 */
135typedef struct PgFdwScanState
136{
137 Relation rel; /* relcache entry for the foreign table. NULL
138 * for a foreign join scan. */
139 TupleDesc tupdesc; /* tuple descriptor of scan */
140 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
141
142 /* extracted fdw_private data */
143 char *query; /* text of SELECT command */
144 List *retrieved_attrs; /* list of retrieved attribute numbers */
145
146 /* for remote query execution */
147 PGconn *conn; /* connection for the scan */
148 PgFdwConnState *conn_state; /* extra per-connection state */
149 unsigned int cursor_number; /* quasi-unique ID for my cursor */
150 bool cursor_exists; /* have we created the cursor? */
151 int numParams; /* number of parameters passed to query */
152 FmgrInfo *param_flinfo; /* output conversion functions for them */
153 List *param_exprs; /* executable expressions for param values */
154 const char **param_values; /* textual values of query parameters */
155
156 /* for storing result tuples */
157 HeapTuple *tuples; /* array of currently-retrieved tuples */
158 int num_tuples; /* # of tuples in array */
159 int next_tuple; /* index of next one to return */
160
161 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
162 int fetch_ct_2; /* Min(# of fetches done, 2) */
163 bool eof_reached; /* true if last fetch reached EOF */
164
165 /* for asynchronous execution */
166 bool async_capable; /* engage asynchronous-capable logic? */
167
168 /* working memory contexts */
169 MemoryContext batch_cxt; /* context holding current batch of tuples */
170 MemoryContext temp_cxt; /* context for per-tuple temporary data */
171
172 int fetch_size; /* number of tuples per fetch */
174
175/*
176 * Execution state of a foreign insert/update/delete operation.
177 */
178typedef struct PgFdwModifyState
179{
180 Relation rel; /* relcache entry for the foreign table */
181 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
182
183 /* for remote query execution */
184 PGconn *conn; /* connection for the scan */
185 PgFdwConnState *conn_state; /* extra per-connection state */
186 char *p_name; /* name of prepared statement, if created */
187
188 /* extracted fdw_private data */
189 char *query; /* text of INSERT/UPDATE/DELETE command */
190 char *orig_query; /* original text of INSERT command */
191 List *target_attrs; /* list of target attribute numbers */
192 int values_end; /* length up to the end of VALUES */
193 int batch_size; /* value of FDW option "batch_size" */
194 bool has_returning; /* is there a RETURNING clause? */
195 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
196
197 /* info about parameters for prepared statement */
198 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
199 int p_nums; /* number of parameters to transmit */
200 FmgrInfo *p_flinfo; /* output conversion functions for them */
201
202 /* batch operation stuff */
203 int num_slots; /* number of slots to insert */
204
205 /* working memory context */
206 MemoryContext temp_cxt; /* context for per-tuple temporary data */
207
208 /* for update row movement if subplan result rel */
209 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
210 * created */
212
213/*
214 * Execution state of a foreign scan that modifies a foreign table directly.
215 */
217{
218 Relation rel; /* relcache entry for the foreign table */
219 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
220
221 /* extracted fdw_private data */
222 char *query; /* text of UPDATE/DELETE command */
223 bool has_returning; /* is there a RETURNING clause? */
224 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
225 bool set_processed; /* do we set the command es_processed? */
226
227 /* for remote query execution */
228 PGconn *conn; /* connection for the update */
229 PgFdwConnState *conn_state; /* extra per-connection state */
230 int numParams; /* number of parameters passed to query */
231 FmgrInfo *param_flinfo; /* output conversion functions for them */
232 List *param_exprs; /* executable expressions for param values */
233 const char **param_values; /* textual values of query parameters */
234
235 /* for storing result tuples */
236 PGresult *result; /* result for query */
237 int num_tuples; /* # of result tuples */
238 int next_tuple; /* index of next one to return */
239 Relation resultRel; /* relcache entry for the target relation */
240 AttrNumber *attnoMap; /* array of attnums of input user columns */
241 AttrNumber ctidAttno; /* attnum of input ctid column */
242 AttrNumber oidAttno; /* attnum of input oid column */
243 bool hasSystemCols; /* are there system columns of resultRel? */
244
245 /* working memory context */
246 MemoryContext temp_cxt; /* context for per-tuple temporary data */
248
249/*
250 * Workspace for analyzing a foreign table.
251 */
252typedef struct PgFdwAnalyzeState
253{
254 Relation rel; /* relcache entry for the foreign table */
255 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
256 List *retrieved_attrs; /* attr numbers retrieved by query */
257
258 /* collected sample rows */
259 HeapTuple *rows; /* array of size targrows */
260 int targrows; /* target # of sample rows */
261 int numrows; /* # of sample rows collected */
262
263 /* for random sampling */
264 double samplerows; /* # of rows fetched */
265 double rowstoskip; /* # of rows to skip before next sample */
266 ReservoirStateData rstate; /* state for reservoir sampling */
267
268 /* working memory contexts */
269 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
270 MemoryContext temp_cxt; /* context for per-tuple temporary data */
272
273/*
274 * This enum describes what's kept in the fdw_private list for a ForeignPath.
275 * We store:
276 *
277 * 1) Boolean flag showing if the remote query has the final sort
278 * 2) Boolean flag showing if the remote query has the LIMIT clause
279 */
281{
282 /* has-final-sort flag (as a Boolean node) */
284 /* has-limit flag (as a Boolean node) */
286};
287
288/* Struct for extra information passed to estimate_path_cost_size() */
289typedef struct
290{
298
299/*
300 * Identify the attribute where data conversion fails.
301 */
302typedef struct ConversionLocation
303{
304 AttrNumber cur_attno; /* attribute number being processed, or 0 */
305 Relation rel; /* foreign table being processed, or NULL */
306 ForeignScanState *fsstate; /* plan node being processed, or NULL */
308
309/* Callback argument for ec_member_matches_foreign */
310typedef struct
311{
312 Expr *current; /* current expr, or NULL if not yet found */
313 List *already_used; /* expressions already dealt with */
315
316/*
317 * SQL functions
318 */
320
321/*
322 * FDW callback routines
323 */
325 RelOptInfo *baserel,
326 Oid foreigntableid);
328 RelOptInfo *baserel,
329 Oid foreigntableid);
331 RelOptInfo *foreignrel,
332 Oid foreigntableid,
333 ForeignPath *best_path,
334 List *tlist,
335 List *scan_clauses,
336 Plan *outer_plan);
337static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
342 Index rtindex,
343 RangeTblEntry *target_rte,
344 Relation target_relation);
347 Index resultRelation,
348 int subplan_index);
350 ResultRelInfo *resultRelInfo,
351 List *fdw_private,
352 int subplan_index,
353 int eflags);
355 ResultRelInfo *resultRelInfo,
356 TupleTableSlot *slot,
357 TupleTableSlot *planSlot);
359 ResultRelInfo *resultRelInfo,
360 TupleTableSlot **slots,
361 TupleTableSlot **planSlots,
362 int *numSlots);
363static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
365 ResultRelInfo *resultRelInfo,
366 TupleTableSlot *slot,
367 TupleTableSlot *planSlot);
369 ResultRelInfo *resultRelInfo,
370 TupleTableSlot *slot,
371 TupleTableSlot *planSlot);
372static void postgresEndForeignModify(EState *estate,
373 ResultRelInfo *resultRelInfo);
375 ResultRelInfo *resultRelInfo);
376static void postgresEndForeignInsert(EState *estate,
377 ResultRelInfo *resultRelInfo);
381 Index resultRelation,
382 int subplan_index);
383static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
387 ExplainState *es);
389 ResultRelInfo *rinfo,
390 List *fdw_private,
391 int subplan_index,
392 ExplainState *es);
394 ExplainState *es);
395static void postgresExecForeignTruncate(List *rels,
396 DropBehavior behavior,
397 bool restart_seqs);
398static bool postgresAnalyzeForeignTable(Relation relation,
400 BlockNumber *totalpages);
402 Oid serverOid);
404 RelOptInfo *joinrel,
405 RelOptInfo *outerrel,
406 RelOptInfo *innerrel,
407 JoinType jointype,
408 JoinPathExtraData *extra);
410 TupleTableSlot *slot);
412 UpperRelationKind stage,
413 RelOptInfo *input_rel,
414 RelOptInfo *output_rel,
415 void *extra);
420
421/*
422 * Helper functions
423 */
425 RelOptInfo *foreignrel,
426 List *param_join_conds,
427 List *pathkeys,
428 PgFdwPathExtraData *fpextra,
429 double *p_rows, int *p_width,
430 int *p_disabled_nodes,
431 Cost *p_startup_cost, Cost *p_total_cost);
432static void get_remote_estimate(const char *sql,
433 PGconn *conn,
434 double *rows,
435 int *width,
436 Cost *startup_cost,
437 Cost *total_cost);
439 List *pathkeys,
440 double retrieved_rows,
441 double width,
442 double limit_tuples,
443 int *disabled_nodes,
444 Cost *p_startup_cost,
445 Cost *p_run_cost);
448 void *arg);
449static void create_cursor(ForeignScanState *node);
450static void fetch_more_data(ForeignScanState *node);
451static void close_cursor(PGconn *conn, unsigned int cursor_number,
452 PgFdwConnState *conn_state);
454 RangeTblEntry *rte,
455 ResultRelInfo *resultRelInfo,
456 CmdType operation,
457 Plan *subplan,
458 char *query,
459 List *target_attrs,
460 int values_end,
461 bool has_returning,
462 List *retrieved_attrs);
464 ResultRelInfo *resultRelInfo,
465 CmdType operation,
466 TupleTableSlot **slots,
467 TupleTableSlot **planSlots,
468 int *numSlots);
469static void prepare_foreign_modify(PgFdwModifyState *fmstate);
470static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
471 ItemPointer tupleid,
472 TupleTableSlot **slots,
473 int numSlots);
474static void store_returning_result(PgFdwModifyState *fmstate,
475 TupleTableSlot *slot, PGresult *res);
476static void finish_foreign_modify(PgFdwModifyState *fmstate);
477static void deallocate_query(PgFdwModifyState *fmstate);
478static List *build_remote_returning(Index rtindex, Relation rel,
479 List *returningList);
480static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
481static void execute_dml_stmt(ForeignScanState *node);
484 List *fdw_scan_tlist,
485 Index rtindex);
487 ResultRelInfo *resultRelInfo,
488 TupleTableSlot *slot,
489 EState *estate);
490static void prepare_query_params(PlanState *node,
491 List *fdw_exprs,
492 int numParams,
493 FmgrInfo **param_flinfo,
494 List **param_exprs,
495 const char ***param_values);
496static void process_query_params(ExprContext *econtext,
497 FmgrInfo *param_flinfo,
498 List *param_exprs,
499 const char **param_values);
500static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
501 HeapTuple *rows, int targrows,
502 double *totalrows,
503 double *totaldeadrows);
504static void analyze_row_processor(PGresult *res, int row,
505 PgFdwAnalyzeState *astate);
506static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
507static void fetch_more_data_begin(AsyncRequest *areq);
508static void complete_pending_request(AsyncRequest *areq);
510 int row,
511 Relation rel,
512 AttInMetadata *attinmeta,
513 List *retrieved_attrs,
514 ForeignScanState *fsstate,
515 MemoryContext temp_context);
516static void conversion_error_callback(void *arg);
517static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
518 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
519 JoinPathExtraData *extra);
520static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
521 Node *havingQual);
523 RelOptInfo *rel);
526 Path *epq_path, List *restrictlist);
528 RelOptInfo *input_rel,
529 RelOptInfo *grouped_rel,
530 GroupPathExtraData *extra);
532 RelOptInfo *input_rel,
533 RelOptInfo *ordered_rel);
535 RelOptInfo *input_rel,
536 RelOptInfo *final_rel,
537 FinalPathExtraData *extra);
538static void apply_server_options(PgFdwRelationInfo *fpinfo);
539static void apply_table_options(PgFdwRelationInfo *fpinfo);
540static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
541 const PgFdwRelationInfo *fpinfo_o,
542 const PgFdwRelationInfo *fpinfo_i);
543static int get_batch_size_option(Relation rel);
544
545
546/*
547 * Foreign-data wrapper handler function: return a struct with pointers
548 * to my callback routines.
549 */
550Datum
552{
553 FdwRoutine *routine = makeNode(FdwRoutine);
554
555 /* Functions for scanning foreign tables */
563
564 /* Functions for updating foreign tables */
581
582 /* Function for EvalPlanQual rechecks */
584 /* Support functions for EXPLAIN */
588
589 /* Support function for TRUNCATE */
591
592 /* Support functions for ANALYZE */
594
595 /* Support functions for IMPORT FOREIGN SCHEMA */
597
598 /* Support functions for join push-down */
600
601 /* Support functions for upper relation push-down */
603
604 /* Support functions for asynchronous execution */
609
610 PG_RETURN_POINTER(routine);
611}
612
613/*
614 * postgresGetForeignRelSize
615 * Estimate # of rows and width of the result of the scan
616 *
617 * We should consider the effect of all baserestrictinfo clauses here, but
618 * not any join clauses.
619 */
620static void
622 RelOptInfo *baserel,
623 Oid foreigntableid)
624{
625 PgFdwRelationInfo *fpinfo;
626 ListCell *lc;
627
628 /*
629 * We use PgFdwRelationInfo to pass various information to subsequent
630 * functions.
631 */
632 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
633 baserel->fdw_private = fpinfo;
634
635 /* Base foreign tables need to be pushed down always. */
636 fpinfo->pushdown_safe = true;
637
638 /* Look up foreign-table catalog info. */
639 fpinfo->table = GetForeignTable(foreigntableid);
640 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
641
642 /*
643 * Extract user-settable option values. Note that per-table settings of
644 * use_remote_estimate, fetch_size and async_capable override per-server
645 * settings of them, respectively.
646 */
647 fpinfo->use_remote_estimate = false;
650 fpinfo->shippable_extensions = NIL;
651 fpinfo->fetch_size = 100;
652 fpinfo->async_capable = false;
653
654 apply_server_options(fpinfo);
655 apply_table_options(fpinfo);
656
657 /*
658 * If the table or the server is configured to use remote estimates,
659 * identify which user to do remote access as during planning. This
660 * should match what ExecCheckPermissions() does. If we fail due to lack
661 * of permissions, the query would have failed at runtime anyway.
662 */
663 if (fpinfo->use_remote_estimate)
664 {
665 Oid userid;
666
667 userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
668 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
669 }
670 else
671 fpinfo->user = NULL;
672
673 /*
674 * Identify which baserestrictinfo clauses can be sent to the remote
675 * server and which can't.
676 */
677 classifyConditions(root, baserel, baserel->baserestrictinfo,
678 &fpinfo->remote_conds, &fpinfo->local_conds);
679
680 /*
681 * Identify which attributes will need to be retrieved from the remote
682 * server. These include all attrs needed for joins or final output, plus
683 * all attrs used in the local_conds. (Note: if we end up using a
684 * parameterized scan, it's possible that some of the join clauses will be
685 * sent to the remote and thus we wouldn't really need to retrieve the
686 * columns used in them. Doesn't seem worth detecting that case though.)
687 */
688 fpinfo->attrs_used = NULL;
689 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
690 &fpinfo->attrs_used);
691 foreach(lc, fpinfo->local_conds)
692 {
694
695 pull_varattnos((Node *) rinfo->clause, baserel->relid,
696 &fpinfo->attrs_used);
697 }
698
699 /*
700 * Compute the selectivity and cost of the local_conds, so we don't have
701 * to do it over again for each path. The best we can do for these
702 * conditions is to estimate selectivity on the basis of local statistics.
703 */
705 fpinfo->local_conds,
706 baserel->relid,
708 NULL);
709
711
712 /*
713 * Set # of retrieved rows and cached relation costs to some negative
714 * value, so that we can detect when they are set to some sensible values,
715 * during one (usually the first) of the calls to estimate_path_cost_size.
716 */
717 fpinfo->retrieved_rows = -1;
718 fpinfo->rel_startup_cost = -1;
719 fpinfo->rel_total_cost = -1;
720
721 /*
722 * If the table or the server is configured to use remote estimates,
723 * connect to the foreign server and execute EXPLAIN to estimate the
724 * number of rows selected by the restriction clauses, as well as the
725 * average row width. Otherwise, estimate using whatever statistics we
726 * have locally, in a way similar to ordinary tables.
727 */
728 if (fpinfo->use_remote_estimate)
729 {
730 /*
731 * Get cost/size estimates with help of remote server. Save the
732 * values in fpinfo so we don't need to do it again to generate the
733 * basic foreign path.
734 */
735 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
736 &fpinfo->rows, &fpinfo->width,
737 &fpinfo->disabled_nodes,
738 &fpinfo->startup_cost, &fpinfo->total_cost);
739
740 /* Report estimated baserel size to planner. */
741 baserel->rows = fpinfo->rows;
742 baserel->reltarget->width = fpinfo->width;
743 }
744 else
745 {
746 /*
747 * If the foreign table has never been ANALYZEd, it will have
748 * reltuples < 0, meaning "unknown". We can't do much if we're not
749 * allowed to consult the remote server, but we can use a hack similar
750 * to plancat.c's treatment of empty relations: use a minimum size
751 * estimate of 10 pages, and divide by the column-datatype-based width
752 * estimate to get the corresponding number of tuples.
753 */
754 if (baserel->tuples < 0)
755 {
756 baserel->pages = 10;
757 baserel->tuples =
758 (10 * BLCKSZ) / (baserel->reltarget->width +
760 }
761
762 /* Estimate baserel size as best we can with local statistics. */
764
765 /* Fill in basically-bogus cost estimates for use later. */
766 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
767 &fpinfo->rows, &fpinfo->width,
768 &fpinfo->disabled_nodes,
769 &fpinfo->startup_cost, &fpinfo->total_cost);
770 }
771
772 /*
773 * fpinfo->relation_name gets the numeric rangetable index of the foreign
774 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
775 * human-readable string at that time.)
776 */
777 fpinfo->relation_name = psprintf("%u", baserel->relid);
778
779 /* No outer and inner relations. */
780 fpinfo->make_outerrel_subquery = false;
781 fpinfo->make_innerrel_subquery = false;
782 fpinfo->lower_subquery_rels = NULL;
783 fpinfo->hidden_subquery_rels = NULL;
784 /* Set the relation index. */
785 fpinfo->relation_index = baserel->relid;
786}
787
788/*
789 * get_useful_ecs_for_relation
790 * Determine which EquivalenceClasses might be involved in useful
791 * orderings of this relation.
792 *
793 * This function is in some respects a mirror image of the core function
794 * pathkeys_useful_for_merging: for a regular table, we know what indexes
795 * we have and want to test whether any of them are useful. For a foreign
796 * table, we don't know what indexes are present on the remote side but
797 * want to speculate about which ones we'd like to use if they existed.
798 *
799 * This function returns a list of potentially-useful equivalence classes,
800 * but it does not guarantee that an EquivalenceMember exists which contains
801 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
802 * ft1.x + t1.x = 0, this function will say that the equivalence class
803 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
804 * t1 is local (or on a different server), it will turn out that no useful
805 * ORDER BY clause can be generated. It's not our job to figure that out
806 * here; we're only interested in identifying relevant ECs.
807 */
808static List *
810{
811 List *useful_eclass_list = NIL;
812 ListCell *lc;
813 Relids relids;
814
815 /*
816 * First, consider whether any active EC is potentially useful for a merge
817 * join against this relation.
818 */
819 if (rel->has_eclass_joins)
820 {
821 foreach(lc, root->eq_classes)
822 {
823 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
824
825 if (eclass_useful_for_merging(root, cur_ec, rel))
826 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
827 }
828 }
829
830 /*
831 * Next, consider whether there are any non-EC derivable join clauses that
832 * are merge-joinable. If the joininfo list is empty, we can exit
833 * quickly.
834 */
835 if (rel->joininfo == NIL)
836 return useful_eclass_list;
837
838 /* If this is a child rel, we must use the topmost parent rel to search. */
839 if (IS_OTHER_REL(rel))
840 {
842 relids = rel->top_parent_relids;
843 }
844 else
845 relids = rel->relids;
846
847 /* Check each join clause in turn. */
848 foreach(lc, rel->joininfo)
849 {
850 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
851
852 /* Consider only mergejoinable clauses */
853 if (restrictinfo->mergeopfamilies == NIL)
854 continue;
855
856 /* Make sure we've got canonical ECs. */
857 update_mergeclause_eclasses(root, restrictinfo);
858
859 /*
860 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
861 * that left_ec and right_ec will be initialized, per comments in
862 * distribute_qual_to_rels.
863 *
864 * We want to identify which side of this merge-joinable clause
865 * contains columns from the relation produced by this RelOptInfo. We
866 * test for overlap, not containment, because there could be extra
867 * relations on either side. For example, suppose we've got something
868 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
869 * A.y = D.y. The input rel might be the joinrel between A and B, and
870 * we'll consider the join clause A.y = D.y. relids contains a
871 * relation not involved in the join class (B) and the equivalence
872 * class for the left-hand side of the clause contains a relation not
873 * involved in the input rel (C). Despite the fact that we have only
874 * overlap and not containment in either direction, A.y is potentially
875 * useful as a sort column.
876 *
877 * Note that it's even possible that relids overlaps neither side of
878 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
879 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
880 * but overlaps neither side of B. In that case, we just skip this
881 * join clause, since it doesn't suggest a useful sort order for this
882 * relation.
883 */
884 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
885 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
886 restrictinfo->right_ec);
887 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
888 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
889 restrictinfo->left_ec);
890 }
891
892 return useful_eclass_list;
893}
894
895/*
896 * get_useful_pathkeys_for_relation
897 * Determine which orderings of a relation might be useful.
898 *
899 * Getting data in sorted order can be useful either because the requested
900 * order matches the final output ordering for the overall query we're
901 * planning, or because it enables an efficient merge join. Here, we try
902 * to figure out which pathkeys to consider.
903 */
904static List *
906{
907 List *useful_pathkeys_list = NIL;
908 List *useful_eclass_list;
909 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
910 EquivalenceClass *query_ec = NULL;
911 ListCell *lc;
912
913 /*
914 * Pushing the query_pathkeys to the remote server is always worth
915 * considering, because it might let us avoid a local sort.
916 */
917 fpinfo->qp_is_pushdown_safe = false;
918 if (root->query_pathkeys)
919 {
920 bool query_pathkeys_ok = true;
921
922 foreach(lc, root->query_pathkeys)
923 {
924 PathKey *pathkey = (PathKey *) lfirst(lc);
925
926 /*
927 * The planner and executor don't have any clever strategy for
928 * taking data sorted by a prefix of the query's pathkeys and
929 * getting it to be sorted by all of those pathkeys. We'll just
930 * end up resorting the entire data set. So, unless we can push
931 * down all of the query pathkeys, forget it.
932 */
933 if (!is_foreign_pathkey(root, rel, pathkey))
934 {
935 query_pathkeys_ok = false;
936 break;
937 }
938 }
939
940 if (query_pathkeys_ok)
941 {
942 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
943 fpinfo->qp_is_pushdown_safe = true;
944 }
945 }
946
947 /*
948 * Even if we're not using remote estimates, having the remote side do the
949 * sort generally won't be any worse than doing it locally, and it might
950 * be much better if the remote side can generate data in the right order
951 * without needing a sort at all. However, what we're going to do next is
952 * try to generate pathkeys that seem promising for possible merge joins,
953 * and that's more speculative. A wrong choice might hurt quite a bit, so
954 * bail out if we can't use remote estimates.
955 */
956 if (!fpinfo->use_remote_estimate)
957 return useful_pathkeys_list;
958
959 /* Get the list of interesting EquivalenceClasses. */
960 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
961
962 /* Extract unique EC for query, if any, so we don't consider it again. */
963 if (list_length(root->query_pathkeys) == 1)
964 {
965 PathKey *query_pathkey = linitial(root->query_pathkeys);
966
967 query_ec = query_pathkey->pk_eclass;
968 }
969
970 /*
971 * As a heuristic, the only pathkeys we consider here are those of length
972 * one. It's surely possible to consider more, but since each one we
973 * choose to consider will generate a round-trip to the remote side, we
974 * need to be a bit cautious here. It would sure be nice to have a local
975 * cache of information about remote index definitions...
976 */
977 foreach(lc, useful_eclass_list)
978 {
979 EquivalenceClass *cur_ec = lfirst(lc);
980 PathKey *pathkey;
981
982 /* If redundant with what we did above, skip it. */
983 if (cur_ec == query_ec)
984 continue;
985
986 /* Can't push down the sort if the EC's opfamily is not shippable. */
988 OperatorFamilyRelationId, fpinfo))
989 continue;
990
991 /* If no pushable expression for this rel, skip it. */
992 if (find_em_for_rel(root, cur_ec, rel) == NULL)
993 continue;
994
995 /* Looks like we can generate a pathkey, so let's do it. */
996 pathkey = make_canonical_pathkey(root, cur_ec,
999 false);
1000 useful_pathkeys_list = lappend(useful_pathkeys_list,
1001 list_make1(pathkey));
1002 }
1003
1004 return useful_pathkeys_list;
1005}
1006
1007/*
1008 * postgresGetForeignPaths
1009 * Create possible scan paths for a scan on the foreign table
1010 */
1011static void
1013 RelOptInfo *baserel,
1014 Oid foreigntableid)
1015{
1016 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1017 ForeignPath *path;
1018 List *ppi_list;
1019 ListCell *lc;
1020
1021 /*
1022 * Create simplest ForeignScan path node and add it to baserel. This path
1023 * corresponds to SeqScan path of regular tables (though depending on what
1024 * baserestrict conditions we were able to send to remote, there might
1025 * actually be an indexscan happening there). We already did all the work
1026 * to estimate cost and size of this path.
1027 *
1028 * Although this path uses no join clauses, it could still have required
1029 * parameterization due to LATERAL refs in its tlist.
1030 */
1031 path = create_foreignscan_path(root, baserel,
1032 NULL, /* default pathtarget */
1033 fpinfo->rows,
1034 fpinfo->disabled_nodes,
1035 fpinfo->startup_cost,
1036 fpinfo->total_cost,
1037 NIL, /* no pathkeys */
1038 baserel->lateral_relids,
1039 NULL, /* no extra plan */
1040 NIL, /* no fdw_restrictinfo list */
1041 NIL); /* no fdw_private list */
1042 add_path(baserel, (Path *) path);
1043
1044 /* Add paths with pathkeys */
1045 add_paths_with_pathkeys_for_rel(root, baserel, NULL, NIL);
1046
1047 /*
1048 * If we're not using remote estimates, stop here. We have no way to
1049 * estimate whether any join clauses would be worth sending across, so
1050 * don't bother building parameterized paths.
1051 */
1052 if (!fpinfo->use_remote_estimate)
1053 return;
1054
1055 /*
1056 * Thumb through all join clauses for the rel to identify which outer
1057 * relations could supply one or more safe-to-send-to-remote join clauses.
1058 * We'll build a parameterized path for each such outer relation.
1059 *
1060 * It's convenient to manage this by representing each candidate outer
1061 * relation by the ParamPathInfo node for it. We can then use the
1062 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1063 * interesting join clauses for that rel. This takes care of the
1064 * possibility that there are multiple safe join clauses for such a rel,
1065 * and also ensures that we account for unsafe join clauses that we'll
1066 * still have to enforce locally (since the parameterized-path machinery
1067 * insists that we handle all movable clauses).
1068 */
1069 ppi_list = NIL;
1070 foreach(lc, baserel->joininfo)
1071 {
1072 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1073 Relids required_outer;
1074 ParamPathInfo *param_info;
1075
1076 /* Check if clause can be moved to this rel */
1077 if (!join_clause_is_movable_to(rinfo, baserel))
1078 continue;
1079
1080 /* See if it is safe to send to remote */
1081 if (!is_foreign_expr(root, baserel, rinfo->clause))
1082 continue;
1083
1084 /* Calculate required outer rels for the resulting path */
1085 required_outer = bms_union(rinfo->clause_relids,
1086 baserel->lateral_relids);
1087 /* We do not want the foreign rel itself listed in required_outer */
1088 required_outer = bms_del_member(required_outer, baserel->relid);
1089
1090 /*
1091 * required_outer probably can't be empty here, but if it were, we
1092 * couldn't make a parameterized path.
1093 */
1094 if (bms_is_empty(required_outer))
1095 continue;
1096
1097 /* Get the ParamPathInfo */
1098 param_info = get_baserel_parampathinfo(root, baserel,
1099 required_outer);
1100 Assert(param_info != NULL);
1101
1102 /*
1103 * Add it to list unless we already have it. Testing pointer equality
1104 * is OK since get_baserel_parampathinfo won't make duplicates.
1105 */
1106 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1107 }
1108
1109 /*
1110 * The above scan examined only "generic" join clauses, not those that
1111 * were absorbed into EquivalenceClauses. See if we can make anything out
1112 * of EquivalenceClauses.
1113 */
1114 if (baserel->has_eclass_joins)
1115 {
1116 /*
1117 * We repeatedly scan the eclass list looking for column references
1118 * (or expressions) belonging to the foreign rel. Each time we find
1119 * one, we generate a list of equivalence joinclauses for it, and then
1120 * see if any are safe to send to the remote. Repeat till there are
1121 * no more candidate EC members.
1122 */
1124
1125 arg.already_used = NIL;
1126 for (;;)
1127 {
1128 List *clauses;
1129
1130 /* Make clauses, skipping any that join to lateral_referencers */
1131 arg.current = NULL;
1133 baserel,
1135 &arg,
1136 baserel->lateral_referencers);
1137
1138 /* Done if there are no more expressions in the foreign rel */
1139 if (arg.current == NULL)
1140 {
1141 Assert(clauses == NIL);
1142 break;
1143 }
1144
1145 /* Scan the extracted join clauses */
1146 foreach(lc, clauses)
1147 {
1148 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1149 Relids required_outer;
1150 ParamPathInfo *param_info;
1151
1152 /* Check if clause can be moved to this rel */
1153 if (!join_clause_is_movable_to(rinfo, baserel))
1154 continue;
1155
1156 /* See if it is safe to send to remote */
1157 if (!is_foreign_expr(root, baserel, rinfo->clause))
1158 continue;
1159
1160 /* Calculate required outer rels for the resulting path */
1161 required_outer = bms_union(rinfo->clause_relids,
1162 baserel->lateral_relids);
1163 required_outer = bms_del_member(required_outer, baserel->relid);
1164 if (bms_is_empty(required_outer))
1165 continue;
1166
1167 /* Get the ParamPathInfo */
1168 param_info = get_baserel_parampathinfo(root, baserel,
1169 required_outer);
1170 Assert(param_info != NULL);
1171
1172 /* Add it to list unless we already have it */
1173 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1174 }
1175
1176 /* Try again, now ignoring the expression we found this time */
1177 arg.already_used = lappend(arg.already_used, arg.current);
1178 }
1179 }
1180
1181 /*
1182 * Now build a path for each useful outer relation.
1183 */
1184 foreach(lc, ppi_list)
1185 {
1186 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1187 double rows;
1188 int width;
1189 int disabled_nodes;
1190 Cost startup_cost;
1191 Cost total_cost;
1192
1193 /* Get a cost estimate from the remote */
1195 param_info->ppi_clauses, NIL, NULL,
1196 &rows, &width, &disabled_nodes,
1197 &startup_cost, &total_cost);
1198
1199 /*
1200 * ppi_rows currently won't get looked at by anything, but still we
1201 * may as well ensure that it matches our idea of the rowcount.
1202 */
1203 param_info->ppi_rows = rows;
1204
1205 /* Make the path */
1206 path = create_foreignscan_path(root, baserel,
1207 NULL, /* default pathtarget */
1208 rows,
1209 disabled_nodes,
1210 startup_cost,
1211 total_cost,
1212 NIL, /* no pathkeys */
1213 param_info->ppi_req_outer,
1214 NULL,
1215 NIL, /* no fdw_restrictinfo list */
1216 NIL); /* no fdw_private list */
1217 add_path(baserel, (Path *) path);
1218 }
1219}
1220
1221/*
1222 * postgresGetForeignPlan
1223 * Create ForeignScan plan node which implements selected best path
1224 */
1225static ForeignScan *
1227 RelOptInfo *foreignrel,
1228 Oid foreigntableid,
1229 ForeignPath *best_path,
1230 List *tlist,
1231 List *scan_clauses,
1232 Plan *outer_plan)
1233{
1234 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1235 Index scan_relid;
1236 List *fdw_private;
1237 List *remote_exprs = NIL;
1238 List *local_exprs = NIL;
1239 List *params_list = NIL;
1240 List *fdw_scan_tlist = NIL;
1241 List *fdw_recheck_quals = NIL;
1242 List *retrieved_attrs;
1243 StringInfoData sql;
1244 bool has_final_sort = false;
1245 bool has_limit = false;
1246 ListCell *lc;
1247
1248 /*
1249 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1250 */
1251 if (best_path->fdw_private)
1252 {
1253 has_final_sort = boolVal(list_nth(best_path->fdw_private,
1255 has_limit = boolVal(list_nth(best_path->fdw_private,
1257 }
1258
1259 if (IS_SIMPLE_REL(foreignrel))
1260 {
1261 /*
1262 * For base relations, set scan_relid as the relid of the relation.
1263 */
1264 scan_relid = foreignrel->relid;
1265
1266 /*
1267 * In a base-relation scan, we must apply the given scan_clauses.
1268 *
1269 * Separate the scan_clauses into those that can be executed remotely
1270 * and those that can't. baserestrictinfo clauses that were
1271 * previously determined to be safe or unsafe by classifyConditions
1272 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1273 * else in the scan_clauses list will be a join clause, which we have
1274 * to check for remote-safety.
1275 *
1276 * Note: the join clauses we see here should be the exact same ones
1277 * previously examined by postgresGetForeignPaths. Possibly it'd be
1278 * worth passing forward the classification work done then, rather
1279 * than repeating it here.
1280 *
1281 * This code must match "extract_actual_clauses(scan_clauses, false)"
1282 * except for the additional decision about remote versus local
1283 * execution.
1284 */
1285 foreach(lc, scan_clauses)
1286 {
1288
1289 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1290 if (rinfo->pseudoconstant)
1291 continue;
1292
1293 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1294 remote_exprs = lappend(remote_exprs, rinfo->clause);
1295 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1296 local_exprs = lappend(local_exprs, rinfo->clause);
1297 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1298 remote_exprs = lappend(remote_exprs, rinfo->clause);
1299 else
1300 local_exprs = lappend(local_exprs, rinfo->clause);
1301 }
1302
1303 /*
1304 * For a base-relation scan, we have to support EPQ recheck, which
1305 * should recheck all the remote quals.
1306 */
1307 fdw_recheck_quals = remote_exprs;
1308 }
1309 else
1310 {
1311 /*
1312 * Join relation or upper relation - set scan_relid to 0.
1313 */
1314 scan_relid = 0;
1315
1316 /*
1317 * For a join rel, baserestrictinfo is NIL and we are not considering
1318 * parameterization right now, so there should be no scan_clauses for
1319 * a joinrel or an upper rel either.
1320 */
1321 Assert(!scan_clauses);
1322
1323 /*
1324 * Instead we get the conditions to apply from the fdw_private
1325 * structure.
1326 */
1327 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1328 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1329
1330 /*
1331 * We leave fdw_recheck_quals empty in this case, since we never need
1332 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1333 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1334 * If we're planning an upperrel (ie, remote grouping or aggregation)
1335 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1336 * allowed, and indeed we *can't* put the remote clauses into
1337 * fdw_recheck_quals because the unaggregated Vars won't be available
1338 * locally.
1339 */
1340
1341 /* Build the list of columns to be fetched from the foreign server. */
1342 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1343
1344 /*
1345 * Ensure that the outer plan produces a tuple whose descriptor
1346 * matches our scan tuple slot. Also, remove the local conditions
1347 * from outer plan's quals, lest they be evaluated twice, once by the
1348 * local plan and once by the scan.
1349 */
1350 if (outer_plan)
1351 {
1352 /*
1353 * Right now, we only consider grouping and aggregation beyond
1354 * joins. Queries involving aggregates or grouping do not require
1355 * EPQ mechanism, hence should not have an outer plan here.
1356 */
1357 Assert(!IS_UPPER_REL(foreignrel));
1358
1359 /*
1360 * First, update the plan's qual list if possible. In some cases
1361 * the quals might be enforced below the topmost plan level, in
1362 * which case we'll fail to remove them; it's not worth working
1363 * harder than this.
1364 */
1365 foreach(lc, local_exprs)
1366 {
1367 Node *qual = lfirst(lc);
1368
1369 outer_plan->qual = list_delete(outer_plan->qual, qual);
1370
1371 /*
1372 * For an inner join the local conditions of foreign scan plan
1373 * can be part of the joinquals as well. (They might also be
1374 * in the mergequals or hashquals, but we can't touch those
1375 * without breaking the plan.)
1376 */
1377 if (IsA(outer_plan, NestLoop) ||
1378 IsA(outer_plan, MergeJoin) ||
1379 IsA(outer_plan, HashJoin))
1380 {
1381 Join *join_plan = (Join *) outer_plan;
1382
1383 if (join_plan->jointype == JOIN_INNER)
1384 join_plan->joinqual = list_delete(join_plan->joinqual,
1385 qual);
1386 }
1387 }
1388
1389 /*
1390 * Now fix the subplan's tlist --- this might result in inserting
1391 * a Result node atop the plan tree.
1392 */
1393 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1394 best_path->path.parallel_safe);
1395 }
1396 }
1397
1398 /*
1399 * Build the query string to be sent for execution, and identify
1400 * expressions to be sent as parameters.
1401 */
1402 initStringInfo(&sql);
1403 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1404 remote_exprs, best_path->path.pathkeys,
1405 has_final_sort, has_limit, false,
1406 &retrieved_attrs, &params_list);
1407
1408 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1409 fpinfo->final_remote_exprs = remote_exprs;
1410
1411 /*
1412 * Build the fdw_private list that will be available to the executor.
1413 * Items in the list must match order in enum FdwScanPrivateIndex.
1414 */
1415 fdw_private = list_make3(makeString(sql.data),
1416 retrieved_attrs,
1417 makeInteger(fpinfo->fetch_size));
1418 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1419 fdw_private = lappend(fdw_private,
1420 makeString(fpinfo->relation_name));
1421
1422 /*
1423 * Create the ForeignScan node for the given relation.
1424 *
1425 * Note that the remote parameter expressions are stored in the fdw_exprs
1426 * field of the finished plan node; we can't keep them in private state
1427 * because then they wouldn't be subject to later planner processing.
1428 */
1429 return make_foreignscan(tlist,
1430 local_exprs,
1431 scan_relid,
1432 params_list,
1433 fdw_private,
1434 fdw_scan_tlist,
1435 fdw_recheck_quals,
1436 outer_plan);
1437}
1438
1439/*
1440 * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1441 */
1442static TupleDesc
1444{
1445 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1446 EState *estate = node->ss.ps.state;
1447 TupleDesc tupdesc;
1448
1449 /*
1450 * The core code has already set up a scan tuple slot based on
1451 * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1452 * but there's one case where it isn't. If we have any whole-row row
1453 * identifier Vars, they may have vartype RECORD, and we need to replace
1454 * that with the associated table's actual composite type. This ensures
1455 * that when we read those ROW() expression values from the remote server,
1456 * we can convert them to a composite type the local server knows.
1457 */
1459 for (int i = 0; i < tupdesc->natts; i++)
1460 {
1461 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1462 Var *var;
1463 RangeTblEntry *rte;
1464 Oid reltype;
1465
1466 /* Nothing to do if it's not a generic RECORD attribute */
1467 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1468 continue;
1469
1470 /*
1471 * If we can't identify the referenced table, do nothing. This'll
1472 * likely lead to failure later, but perhaps we can muddle through.
1473 */
1474 var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1475 i)->expr;
1476 if (!IsA(var, Var) || var->varattno != 0)
1477 continue;
1478 rte = list_nth(estate->es_range_table, var->varno - 1);
1479 if (rte->rtekind != RTE_RELATION)
1480 continue;
1481 reltype = get_rel_type_id(rte->relid);
1482 if (!OidIsValid(reltype))
1483 continue;
1484 att->atttypid = reltype;
1485 /* shouldn't need to change anything else */
1486 }
1487 return tupdesc;
1488}
1489
1490/*
1491 * postgresBeginForeignScan
1492 * Initiate an executor scan of a foreign PostgreSQL table.
1493 */
1494static void
1496{
1497 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1498 EState *estate = node->ss.ps.state;
1499 PgFdwScanState *fsstate;
1500 RangeTblEntry *rte;
1501 Oid userid;
1502 ForeignTable *table;
1504 int rtindex;
1505 int numParams;
1506
1507 /*
1508 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1509 */
1510 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1511 return;
1512
1513 /*
1514 * We'll save private state in node->fdw_state.
1515 */
1516 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1517 node->fdw_state = fsstate;
1518
1519 /*
1520 * Identify which user to do the remote access as. This should match what
1521 * ExecCheckPermissions() does.
1522 */
1523 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1524 if (fsplan->scan.scanrelid > 0)
1525 rtindex = fsplan->scan.scanrelid;
1526 else
1527 rtindex = bms_next_member(fsplan->fs_base_relids, -1);
1528 rte = exec_rt_fetch(rtindex, estate);
1529
1530 /* Get info about foreign table. */
1531 table = GetForeignTable(rte->relid);
1532 user = GetUserMapping(userid, table->serverid);
1533
1534 /*
1535 * Get connection to the foreign server. Connection manager will
1536 * establish new connection if necessary.
1537 */
1538 fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1539
1540 /* Assign a unique ID for my cursor */
1541 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1542 fsstate->cursor_exists = false;
1543
1544 /* Get private info created by planner functions. */
1545 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1547 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1549 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1551
1552 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1553 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1554 "postgres_fdw tuple data",
1556 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1557 "postgres_fdw temporary data",
1559
1560 /*
1561 * Get info we'll need for converting data fetched from the foreign server
1562 * into local representation and error reporting during that process.
1563 */
1564 if (fsplan->scan.scanrelid > 0)
1565 {
1566 fsstate->rel = node->ss.ss_currentRelation;
1567 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1568 }
1569 else
1570 {
1571 fsstate->rel = NULL;
1572 fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1573 }
1574
1575 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1576
1577 /*
1578 * Prepare for processing of parameters used in remote query, if any.
1579 */
1580 numParams = list_length(fsplan->fdw_exprs);
1581 fsstate->numParams = numParams;
1582 if (numParams > 0)
1584 fsplan->fdw_exprs,
1585 numParams,
1586 &fsstate->param_flinfo,
1587 &fsstate->param_exprs,
1588 &fsstate->param_values);
1589
1590 /* Set the async-capable flag */
1591 fsstate->async_capable = node->ss.ps.async_capable;
1592}
1593
1594/*
1595 * postgresIterateForeignScan
1596 * Retrieve next row from the result set, or clear tuple slot to indicate
1597 * EOF.
1598 */
1599static TupleTableSlot *
1601{
1602 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1603 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1604
1605 /*
1606 * In sync mode, if this is the first call after Begin or ReScan, we need
1607 * to create the cursor on the remote side. In async mode, we would have
1608 * already created the cursor before we get here, even if this is the
1609 * first call after Begin or ReScan.
1610 */
1611 if (!fsstate->cursor_exists)
1612 create_cursor(node);
1613
1614 /*
1615 * Get some more tuples, if we've run out.
1616 */
1617 if (fsstate->next_tuple >= fsstate->num_tuples)
1618 {
1619 /* In async mode, just clear tuple slot. */
1620 if (fsstate->async_capable)
1621 return ExecClearTuple(slot);
1622 /* No point in another fetch if we already detected EOF, though. */
1623 if (!fsstate->eof_reached)
1624 fetch_more_data(node);
1625 /* If we didn't get any tuples, must be end of data. */
1626 if (fsstate->next_tuple >= fsstate->num_tuples)
1627 return ExecClearTuple(slot);
1628 }
1629
1630 /*
1631 * Return the next tuple.
1632 */
1633 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1634 slot,
1635 false);
1636
1637 return slot;
1638}
1639
1640/*
1641 * postgresReScanForeignScan
1642 * Restart the scan.
1643 */
1644static void
1646{
1647 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1648 char sql[64];
1649 PGresult *res;
1650
1651 /* If we haven't created the cursor yet, nothing to do. */
1652 if (!fsstate->cursor_exists)
1653 return;
1654
1655 /*
1656 * If the node is async-capable, and an asynchronous fetch for it has
1657 * begun, the asynchronous fetch might not have yet completed. Check if
1658 * the node is async-capable, and an asynchronous fetch for it is still in
1659 * progress; if so, complete the asynchronous fetch before restarting the
1660 * scan.
1661 */
1662 if (fsstate->async_capable &&
1663 fsstate->conn_state->pendingAreq &&
1664 fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1665 fetch_more_data(node);
1666
1667 /*
1668 * If any internal parameters affecting this node have changed, we'd
1669 * better destroy and recreate the cursor. Otherwise, if the remote
1670 * server is v14 or older, rewinding it should be good enough; if not,
1671 * rewind is only allowed for scrollable cursors, but we don't have a way
1672 * to check the scrollability of it, so destroy and recreate it in any
1673 * case. If we've only fetched zero or one batch, we needn't even rewind
1674 * the cursor, just rescan what we have.
1675 */
1676 if (node->ss.ps.chgParam != NULL)
1677 {
1678 fsstate->cursor_exists = false;
1679 snprintf(sql, sizeof(sql), "CLOSE c%u",
1680 fsstate->cursor_number);
1681 }
1682 else if (fsstate->fetch_ct_2 > 1)
1683 {
1684 if (PQserverVersion(fsstate->conn) < 150000)
1685 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1686 fsstate->cursor_number);
1687 else
1688 {
1689 fsstate->cursor_exists = false;
1690 snprintf(sql, sizeof(sql), "CLOSE c%u",
1691 fsstate->cursor_number);
1692 }
1693 }
1694 else
1695 {
1696 /* Easy: just rescan what we already have in memory, if anything */
1697 fsstate->next_tuple = 0;
1698 return;
1699 }
1700
1701 /*
1702 * We don't use a PG_TRY block here, so be careful not to throw error
1703 * without releasing the PGresult.
1704 */
1705 res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1707 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1708 PQclear(res);
1709
1710 /* Now force a fresh FETCH. */
1711 fsstate->tuples = NULL;
1712 fsstate->num_tuples = 0;
1713 fsstate->next_tuple = 0;
1714 fsstate->fetch_ct_2 = 0;
1715 fsstate->eof_reached = false;
1716}
1717
1718/*
1719 * postgresEndForeignScan
1720 * Finish scanning foreign table and dispose objects used for this scan
1721 */
1722static void
1724{
1725 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1726
1727 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1728 if (fsstate == NULL)
1729 return;
1730
1731 /* Close the cursor if open, to prevent accumulation of cursors */
1732 if (fsstate->cursor_exists)
1733 close_cursor(fsstate->conn, fsstate->cursor_number,
1734 fsstate->conn_state);
1735
1736 /* Release remote connection */
1737 ReleaseConnection(fsstate->conn);
1738 fsstate->conn = NULL;
1739
1740 /* MemoryContexts will be deleted automatically. */
1741}
1742
1743/*
1744 * postgresAddForeignUpdateTargets
1745 * Add resjunk column(s) needed for update/delete on a foreign table
1746 */
1747static void
1749 Index rtindex,
1750 RangeTblEntry *target_rte,
1751 Relation target_relation)
1752{
1753 Var *var;
1754
1755 /*
1756 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1757 */
1758
1759 /* Make a Var representing the desired value */
1760 var = makeVar(rtindex,
1762 TIDOID,
1763 -1,
1764 InvalidOid,
1765 0);
1766
1767 /* Register it as a row-identity column needed by this target rel */
1768 add_row_identity_var(root, var, rtindex, "ctid");
1769}
1770
1771/*
1772 * postgresPlanForeignModify
1773 * Plan an insert/update/delete operation on a foreign table
1774 */
1775static List *
1778 Index resultRelation,
1779 int subplan_index)
1780{
1781 CmdType operation = plan->operation;
1782 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1783 Relation rel;
1784 StringInfoData sql;
1785 List *targetAttrs = NIL;
1786 List *withCheckOptionList = NIL;
1787 List *returningList = NIL;
1788 List *retrieved_attrs = NIL;
1789 bool doNothing = false;
1790 int values_end_len = -1;
1791
1792 initStringInfo(&sql);
1793
1794 /*
1795 * Core code already has some lock on each rel being planned, so we can
1796 * use NoLock here.
1797 */
1798 rel = table_open(rte->relid, NoLock);
1799
1800 /*
1801 * In an INSERT, we transmit all columns that are defined in the foreign
1802 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1803 * foreign table, we transmit all columns like INSERT; else we transmit
1804 * only columns that were explicitly targets of the UPDATE, so as to avoid
1805 * unnecessary data transmission. (We can't do that for INSERT since we
1806 * would miss sending default values for columns not listed in the source
1807 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1808 * those triggers might change values for non-target columns, in which
1809 * case we would miss sending changed values for those columns.)
1810 */
1811 if (operation == CMD_INSERT ||
1812 (operation == CMD_UPDATE &&
1813 rel->trigdesc &&
1815 {
1816 TupleDesc tupdesc = RelationGetDescr(rel);
1817 int attnum;
1818
1819 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1820 {
1821 CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
1822
1823 if (!attr->attisdropped)
1824 targetAttrs = lappend_int(targetAttrs, attnum);
1825 }
1826 }
1827 else if (operation == CMD_UPDATE)
1828 {
1829 int col;
1830 RelOptInfo *rel = find_base_rel(root, resultRelation);
1831 Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel);
1832
1833 col = -1;
1834 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1835 {
1836 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1838
1839 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1840 elog(ERROR, "system-column update is not supported");
1841 targetAttrs = lappend_int(targetAttrs, attno);
1842 }
1843 }
1844
1845 /*
1846 * Extract the relevant WITH CHECK OPTION list if any.
1847 */
1848 if (plan->withCheckOptionLists)
1849 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1850 subplan_index);
1851
1852 /*
1853 * Extract the relevant RETURNING list if any.
1854 */
1855 if (plan->returningLists)
1856 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1857
1858 /*
1859 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1860 * should have already been rejected in the optimizer, as presently there
1861 * is no way to recognize an arbiter index on a foreign table. Only DO
1862 * NOTHING is supported without an inference specification.
1863 */
1864 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1865 doNothing = true;
1866 else if (plan->onConflictAction != ONCONFLICT_NONE)
1867 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1868 (int) plan->onConflictAction);
1869
1870 /*
1871 * Construct the SQL command string.
1872 */
1873 switch (operation)
1874 {
1875 case CMD_INSERT:
1876 deparseInsertSql(&sql, rte, resultRelation, rel,
1877 targetAttrs, doNothing,
1878 withCheckOptionList, returningList,
1879 &retrieved_attrs, &values_end_len);
1880 break;
1881 case CMD_UPDATE:
1882 deparseUpdateSql(&sql, rte, resultRelation, rel,
1883 targetAttrs,
1884 withCheckOptionList, returningList,
1885 &retrieved_attrs);
1886 break;
1887 case CMD_DELETE:
1888 deparseDeleteSql(&sql, rte, resultRelation, rel,
1889 returningList,
1890 &retrieved_attrs);
1891 break;
1892 default:
1893 elog(ERROR, "unexpected operation: %d", (int) operation);
1894 break;
1895 }
1896
1897 table_close(rel, NoLock);
1898
1899 /*
1900 * Build the fdw_private list that will be available to the executor.
1901 * Items in the list must match enum FdwModifyPrivateIndex, above.
1902 */
1903 return list_make5(makeString(sql.data),
1904 targetAttrs,
1905 makeInteger(values_end_len),
1906 makeBoolean((retrieved_attrs != NIL)),
1907 retrieved_attrs);
1908}
1909
1910/*
1911 * postgresBeginForeignModify
1912 * Begin an insert/update/delete operation on a foreign table
1913 */
1914static void
1916 ResultRelInfo *resultRelInfo,
1917 List *fdw_private,
1918 int subplan_index,
1919 int eflags)
1920{
1921 PgFdwModifyState *fmstate;
1922 char *query;
1923 List *target_attrs;
1924 bool has_returning;
1925 int values_end_len;
1926 List *retrieved_attrs;
1927 RangeTblEntry *rte;
1928
1929 /*
1930 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1931 * stays NULL.
1932 */
1933 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1934 return;
1935
1936 /* Deconstruct fdw_private data. */
1937 query = strVal(list_nth(fdw_private,
1939 target_attrs = (List *) list_nth(fdw_private,
1941 values_end_len = intVal(list_nth(fdw_private,
1943 has_returning = boolVal(list_nth(fdw_private,
1945 retrieved_attrs = (List *) list_nth(fdw_private,
1947
1948 /* Find RTE. */
1949 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1950 mtstate->ps.state);
1951
1952 /* Construct an execution state. */
1953 fmstate = create_foreign_modify(mtstate->ps.state,
1954 rte,
1955 resultRelInfo,
1956 mtstate->operation,
1957 outerPlanState(mtstate)->plan,
1958 query,
1959 target_attrs,
1960 values_end_len,
1961 has_returning,
1962 retrieved_attrs);
1963
1964 resultRelInfo->ri_FdwState = fmstate;
1965}
1966
1967/*
1968 * postgresExecForeignInsert
1969 * Insert one row into a foreign table
1970 */
1971static TupleTableSlot *
1973 ResultRelInfo *resultRelInfo,
1974 TupleTableSlot *slot,
1975 TupleTableSlot *planSlot)
1976{
1977 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1978 TupleTableSlot **rslot;
1979 int numSlots = 1;
1980
1981 /*
1982 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1983 * postgresBeginForeignInsert())
1984 */
1985 if (fmstate->aux_fmstate)
1986 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1987 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1988 &slot, &planSlot, &numSlots);
1989 /* Revert that change */
1990 if (fmstate->aux_fmstate)
1991 resultRelInfo->ri_FdwState = fmstate;
1992
1993 return rslot ? *rslot : NULL;
1994}
1995
1996/*
1997 * postgresExecForeignBatchInsert
1998 * Insert multiple rows into a foreign table
1999 */
2000static TupleTableSlot **
2002 ResultRelInfo *resultRelInfo,
2003 TupleTableSlot **slots,
2004 TupleTableSlot **planSlots,
2005 int *numSlots)
2006{
2007 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2008 TupleTableSlot **rslot;
2009
2010 /*
2011 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
2012 * postgresBeginForeignInsert())
2013 */
2014 if (fmstate->aux_fmstate)
2015 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
2016 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
2017 slots, planSlots, numSlots);
2018 /* Revert that change */
2019 if (fmstate->aux_fmstate)
2020 resultRelInfo->ri_FdwState = fmstate;
2021
2022 return rslot;
2023}
2024
2025/*
2026 * postgresGetForeignModifyBatchSize
2027 * Determine the maximum number of tuples that can be inserted in bulk
2028 *
2029 * Returns the batch size specified for server or table. When batching is not
2030 * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2031 * clause), returns 1.
2032 */
2033static int
2035{
2036 int batch_size;
2037 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2038
2039 /* should be called only once */
2040 Assert(resultRelInfo->ri_BatchSize == 0);
2041
2042 /*
2043 * Should never get called when the insert is being performed on a table
2044 * that is also among the target relations of an UPDATE operation, because
2045 * postgresBeginForeignInsert() currently rejects such insert attempts.
2046 */
2047 Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2048
2049 /*
2050 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2051 * the option directly in server/table options. Otherwise just use the
2052 * value we determined earlier.
2053 */
2054 if (fmstate)
2055 batch_size = fmstate->batch_size;
2056 else
2057 batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2058
2059 /*
2060 * Disable batching when we have to use RETURNING, there are any
2061 * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2062 * WITH CHECK OPTION constraints from parent views.
2063 *
2064 * When there are any BEFORE ROW INSERT triggers on the table, we can't
2065 * support it, because such triggers might query the table we're inserting
2066 * into and act differently if the tuples that have already been processed
2067 * and prepared for insertion are not there.
2068 */
2069 if (resultRelInfo->ri_projectReturning != NULL ||
2070 resultRelInfo->ri_WithCheckOptions != NIL ||
2071 (resultRelInfo->ri_TrigDesc &&
2072 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2073 resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2074 return 1;
2075
2076 /*
2077 * If the foreign table has no columns, disable batching as the INSERT
2078 * syntax doesn't allow batching multiple empty rows into a zero-column
2079 * table in a single statement. This is needed for COPY FROM, in which
2080 * case fmstate must be non-NULL.
2081 */
2082 if (fmstate && list_length(fmstate->target_attrs) == 0)
2083 return 1;
2084
2085 /*
2086 * Otherwise use the batch size specified for server/table. The number of
2087 * parameters in a batch is limited to 65535 (uint16), so make sure we
2088 * don't exceed this limit by using the maximum batch_size possible.
2089 */
2090 if (fmstate && fmstate->p_nums > 0)
2091 batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2092
2093 return batch_size;
2094}
2095
2096/*
2097 * postgresExecForeignUpdate
2098 * Update one row in a foreign table
2099 */
2100static TupleTableSlot *
2102 ResultRelInfo *resultRelInfo,
2103 TupleTableSlot *slot,
2104 TupleTableSlot *planSlot)
2105{
2106 TupleTableSlot **rslot;
2107 int numSlots = 1;
2108
2109 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2110 &slot, &planSlot, &numSlots);
2111
2112 return rslot ? rslot[0] : NULL;
2113}
2114
2115/*
2116 * postgresExecForeignDelete
2117 * Delete one row from a foreign table
2118 */
2119static TupleTableSlot *
2121 ResultRelInfo *resultRelInfo,
2122 TupleTableSlot *slot,
2123 TupleTableSlot *planSlot)
2124{
2125 TupleTableSlot **rslot;
2126 int numSlots = 1;
2127
2128 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2129 &slot, &planSlot, &numSlots);
2130
2131 return rslot ? rslot[0] : NULL;
2132}
2133
2134/*
2135 * postgresEndForeignModify
2136 * Finish an insert/update/delete operation on a foreign table
2137 */
2138static void
2140 ResultRelInfo *resultRelInfo)
2141{
2142 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2143
2144 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2145 if (fmstate == NULL)
2146 return;
2147
2148 /* Destroy the execution state */
2149 finish_foreign_modify(fmstate);
2150}
2151
2152/*
2153 * postgresBeginForeignInsert
2154 * Begin an insert operation on a foreign table
2155 */
2156static void
2158 ResultRelInfo *resultRelInfo)
2159{
2160 PgFdwModifyState *fmstate;
2162 EState *estate = mtstate->ps.state;
2163 Index resultRelation;
2164 Relation rel = resultRelInfo->ri_RelationDesc;
2165 RangeTblEntry *rte;
2166 TupleDesc tupdesc = RelationGetDescr(rel);
2167 int attnum;
2168 int values_end_len;
2169 StringInfoData sql;
2170 List *targetAttrs = NIL;
2171 List *retrieved_attrs = NIL;
2172 bool doNothing = false;
2173
2174 /*
2175 * If the foreign table we are about to insert routed rows into is also an
2176 * UPDATE subplan result rel that will be updated later, proceeding with
2177 * the INSERT will result in the later UPDATE incorrectly modifying those
2178 * routed rows, so prevent the INSERT --- it would be nice if we could
2179 * handle this case; but for now, throw an error for safety.
2180 */
2181 if (plan && plan->operation == CMD_UPDATE &&
2182 (resultRelInfo->ri_usesFdwDirectModify ||
2183 resultRelInfo->ri_FdwState))
2184 ereport(ERROR,
2185 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2186 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2188
2189 initStringInfo(&sql);
2190
2191 /* We transmit all columns that are defined in the foreign table. */
2192 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2193 {
2194 CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
2195
2196 if (!attr->attisdropped)
2197 targetAttrs = lappend_int(targetAttrs, attnum);
2198 }
2199
2200 /* Check if we add the ON CONFLICT clause to the remote query. */
2201 if (plan)
2202 {
2203 OnConflictAction onConflictAction = plan->onConflictAction;
2204
2205 /* We only support DO NOTHING without an inference specification. */
2206 if (onConflictAction == ONCONFLICT_NOTHING)
2207 doNothing = true;
2208 else if (onConflictAction != ONCONFLICT_NONE)
2209 elog(ERROR, "unexpected ON CONFLICT specification: %d",
2210 (int) onConflictAction);
2211 }
2212
2213 /*
2214 * If the foreign table is a partition that doesn't have a corresponding
2215 * RTE entry, we need to create a new RTE describing the foreign table for
2216 * use by deparseInsertSql and create_foreign_modify() below, after first
2217 * copying the parent's RTE and modifying some fields to describe the
2218 * foreign partition to work on. However, if this is invoked by UPDATE,
2219 * the existing RTE may already correspond to this partition if it is one
2220 * of the UPDATE subplan target rels; in that case, we can just use the
2221 * existing RTE as-is.
2222 */
2223 if (resultRelInfo->ri_RangeTableIndex == 0)
2224 {
2225 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2226
2227 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2228 rte = copyObject(rte);
2229 rte->relid = RelationGetRelid(rel);
2230 rte->relkind = RELKIND_FOREIGN_TABLE;
2231
2232 /*
2233 * For UPDATE, we must use the RT index of the first subplan target
2234 * rel's RTE, because the core code would have built expressions for
2235 * the partition, such as RETURNING, using that RT index as varno of
2236 * Vars contained in those expressions.
2237 */
2238 if (plan && plan->operation == CMD_UPDATE &&
2239 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2240 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2241 else
2242 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2243 }
2244 else
2245 {
2246 resultRelation = resultRelInfo->ri_RangeTableIndex;
2247 rte = exec_rt_fetch(resultRelation, estate);
2248 }
2249
2250 /* Construct the SQL command string. */
2251 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2252 resultRelInfo->ri_WithCheckOptions,
2253 resultRelInfo->ri_returningList,
2254 &retrieved_attrs, &values_end_len);
2255
2256 /* Construct an execution state. */
2257 fmstate = create_foreign_modify(mtstate->ps.state,
2258 rte,
2259 resultRelInfo,
2260 CMD_INSERT,
2261 NULL,
2262 sql.data,
2263 targetAttrs,
2264 values_end_len,
2265 retrieved_attrs != NIL,
2266 retrieved_attrs);
2267
2268 /*
2269 * If the given resultRelInfo already has PgFdwModifyState set, it means
2270 * the foreign table is an UPDATE subplan result rel; in which case, store
2271 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2272 */
2273 if (resultRelInfo->ri_FdwState)
2274 {
2275 Assert(plan && plan->operation == CMD_UPDATE);
2276 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2277 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2278 }
2279 else
2280 resultRelInfo->ri_FdwState = fmstate;
2281}
2282
2283/*
2284 * postgresEndForeignInsert
2285 * Finish an insert operation on a foreign table
2286 */
2287static void
2289 ResultRelInfo *resultRelInfo)
2290{
2291 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2292
2293 Assert(fmstate != NULL);
2294
2295 /*
2296 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2297 * postgresBeginForeignInsert())
2298 */
2299 if (fmstate->aux_fmstate)
2300 fmstate = fmstate->aux_fmstate;
2301
2302 /* Destroy the execution state */
2303 finish_foreign_modify(fmstate);
2304}
2305
2306/*
2307 * postgresIsForeignRelUpdatable
2308 * Determine whether a foreign table supports INSERT, UPDATE and/or
2309 * DELETE.
2310 */
2311static int
2313{
2314 bool updatable;
2315 ForeignTable *table;
2316 ForeignServer *server;
2317 ListCell *lc;
2318
2319 /*
2320 * By default, all postgres_fdw foreign tables are assumed updatable. This
2321 * can be overridden by a per-server setting, which in turn can be
2322 * overridden by a per-table setting.
2323 */
2324 updatable = true;
2325
2326 table = GetForeignTable(RelationGetRelid(rel));
2327 server = GetForeignServer(table->serverid);
2328
2329 foreach(lc, server->options)
2330 {
2331 DefElem *def = (DefElem *) lfirst(lc);
2332
2333 if (strcmp(def->defname, "updatable") == 0)
2334 updatable = defGetBoolean(def);
2335 }
2336 foreach(lc, table->options)
2337 {
2338 DefElem *def = (DefElem *) lfirst(lc);
2339
2340 if (strcmp(def->defname, "updatable") == 0)
2341 updatable = defGetBoolean(def);
2342 }
2343
2344 /*
2345 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2346 */
2347 return updatable ?
2348 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2349}
2350
2351/*
2352 * postgresRecheckForeignScan
2353 * Execute a local join execution plan for a foreign join
2354 */
2355static bool
2357{
2358 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2360 TupleTableSlot *result;
2361
2362 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2363 if (scanrelid > 0)
2364 return true;
2365
2366 Assert(outerPlan != NULL);
2367
2368 /* Execute a local join execution plan */
2369 result = ExecProcNode(outerPlan);
2370 if (TupIsNull(result))
2371 return false;
2372
2373 /* Store result in the given slot */
2374 ExecCopySlot(slot, result);
2375
2376 return true;
2377}
2378
2379/*
2380 * find_modifytable_subplan
2381 * Helper routine for postgresPlanDirectModify to find the
2382 * ModifyTable subplan node that scans the specified RTI.
2383 *
2384 * Returns NULL if the subplan couldn't be identified. That's not a fatal
2385 * error condition, we just abandon trying to do the update directly.
2386 */
2387static ForeignScan *
2390 Index rtindex,
2391 int subplan_index)
2392{
2393 Plan *subplan = outerPlan(plan);
2394
2395 /*
2396 * The cases we support are (1) the desired ForeignScan is the immediate
2397 * child of ModifyTable, or (2) it is the subplan_index'th child of an
2398 * Append node that is the immediate child of ModifyTable. There is no
2399 * point in looking further down, as that would mean that local joins are
2400 * involved, so we can't do the update directly.
2401 *
2402 * There could be a Result atop the Append too, acting to compute the
2403 * UPDATE targetlist values. We ignore that here; the tlist will be
2404 * checked by our caller.
2405 *
2406 * In principle we could examine all the children of the Append, but it's
2407 * currently unlikely that the core planner would generate such a plan
2408 * with the children out-of-order. Moreover, such a search risks costing
2409 * O(N^2) time when there are a lot of children.
2410 */
2411 if (IsA(subplan, Append))
2412 {
2413 Append *appendplan = (Append *) subplan;
2414
2415 if (subplan_index < list_length(appendplan->appendplans))
2416 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2417 }
2418 else if (IsA(subplan, Result) &&
2419 outerPlan(subplan) != NULL &&
2420 IsA(outerPlan(subplan), Append))
2421 {
2422 Append *appendplan = (Append *) outerPlan(subplan);
2423
2424 if (subplan_index < list_length(appendplan->appendplans))
2425 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2426 }
2427
2428 /* Now, have we got a ForeignScan on the desired rel? */
2429 if (IsA(subplan, ForeignScan))
2430 {
2431 ForeignScan *fscan = (ForeignScan *) subplan;
2432
2433 if (bms_is_member(rtindex, fscan->fs_base_relids))
2434 return fscan;
2435 }
2436
2437 return NULL;
2438}
2439
2440/*
2441 * postgresPlanDirectModify
2442 * Consider a direct foreign table modification
2443 *
2444 * Decide whether it is safe to modify a foreign table directly, and if so,
2445 * rewrite subplan accordingly.
2446 */
2447static bool
2450 Index resultRelation,
2451 int subplan_index)
2452{
2453 CmdType operation = plan->operation;
2454 RelOptInfo *foreignrel;
2455 RangeTblEntry *rte;
2456 PgFdwRelationInfo *fpinfo;
2457 Relation rel;
2458 StringInfoData sql;
2459 ForeignScan *fscan;
2460 List *processed_tlist = NIL;
2461 List *targetAttrs = NIL;
2462 List *remote_exprs;
2463 List *params_list = NIL;
2464 List *returningList = NIL;
2465 List *retrieved_attrs = NIL;
2466
2467 /*
2468 * Decide whether it is safe to modify a foreign table directly.
2469 */
2470
2471 /*
2472 * The table modification must be an UPDATE or DELETE.
2473 */
2474 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2475 return false;
2476
2477 /*
2478 * Try to locate the ForeignScan subplan that's scanning resultRelation.
2479 */
2480 fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2481 if (!fscan)
2482 return false;
2483
2484 /*
2485 * It's unsafe to modify a foreign table directly if there are any quals
2486 * that should be evaluated locally.
2487 */
2488 if (fscan->scan.plan.qual != NIL)
2489 return false;
2490
2491 /* Safe to fetch data about the target foreign rel */
2492 if (fscan->scan.scanrelid == 0)
2493 {
2494 foreignrel = find_join_rel(root, fscan->fs_relids);
2495 /* We should have a rel for this foreign join. */
2496 Assert(foreignrel);
2497 }
2498 else
2499 foreignrel = root->simple_rel_array[resultRelation];
2500 rte = root->simple_rte_array[resultRelation];
2501 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2502
2503 /*
2504 * It's unsafe to update a foreign table directly, if any expressions to
2505 * assign to the target columns are unsafe to evaluate remotely.
2506 */
2507 if (operation == CMD_UPDATE)
2508 {
2509 ListCell *lc,
2510 *lc2;
2511
2512 /*
2513 * The expressions of concern are the first N columns of the processed
2514 * targetlist, where N is the length of the rel's update_colnos.
2515 */
2516 get_translated_update_targetlist(root, resultRelation,
2517 &processed_tlist, &targetAttrs);
2518 forboth(lc, processed_tlist, lc2, targetAttrs)
2519 {
2521 AttrNumber attno = lfirst_int(lc2);
2522
2523 /* update's new-value expressions shouldn't be resjunk */
2524 Assert(!tle->resjunk);
2525
2526 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2527 elog(ERROR, "system-column update is not supported");
2528
2529 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2530 return false;
2531 }
2532 }
2533
2534 /*
2535 * Ok, rewrite subplan so as to modify the foreign table directly.
2536 */
2537 initStringInfo(&sql);
2538
2539 /*
2540 * Core code already has some lock on each rel being planned, so we can
2541 * use NoLock here.
2542 */
2543 rel = table_open(rte->relid, NoLock);
2544
2545 /*
2546 * Recall the qual clauses that must be evaluated remotely. (These are
2547 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2548 * doesn't care.)
2549 */
2550 remote_exprs = fpinfo->final_remote_exprs;
2551
2552 /*
2553 * Extract the relevant RETURNING list if any.
2554 */
2555 if (plan->returningLists)
2556 {
2557 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2558
2559 /*
2560 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2561 * we fetch from the foreign server any Vars specified in RETURNING
2562 * that refer not only to the target relation but to non-target
2563 * relations. So we'll deparse them into the RETURNING clause of the
2564 * remote query; use a targetlist consisting of them instead, which
2565 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2566 * node below.
2567 */
2568 if (fscan->scan.scanrelid == 0)
2569 returningList = build_remote_returning(resultRelation, rel,
2570 returningList);
2571 }
2572
2573 /*
2574 * Construct the SQL command string.
2575 */
2576 switch (operation)
2577 {
2578 case CMD_UPDATE:
2579 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2580 foreignrel,
2581 processed_tlist,
2582 targetAttrs,
2583 remote_exprs, &params_list,
2584 returningList, &retrieved_attrs);
2585 break;
2586 case CMD_DELETE:
2587 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2588 foreignrel,
2589 remote_exprs, &params_list,
2590 returningList, &retrieved_attrs);
2591 break;
2592 default:
2593 elog(ERROR, "unexpected operation: %d", (int) operation);
2594 break;
2595 }
2596
2597 /*
2598 * Update the operation and target relation info.
2599 */
2600 fscan->operation = operation;
2601 fscan->resultRelation = resultRelation;
2602
2603 /*
2604 * Update the fdw_exprs list that will be available to the executor.
2605 */
2606 fscan->fdw_exprs = params_list;
2607
2608 /*
2609 * Update the fdw_private list that will be available to the executor.
2610 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2611 */
2612 fscan->fdw_private = list_make4(makeString(sql.data),
2613 makeBoolean((retrieved_attrs != NIL)),
2614 retrieved_attrs,
2615 makeBoolean(plan->canSetTag));
2616
2617 /*
2618 * Update the foreign-join-related fields.
2619 */
2620 if (fscan->scan.scanrelid == 0)
2621 {
2622 /* No need for the outer subplan. */
2623 fscan->scan.plan.lefttree = NULL;
2624
2625 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2626 if (returningList)
2627 rebuild_fdw_scan_tlist(fscan, returningList);
2628 }
2629
2630 /*
2631 * Finally, unset the async-capable flag if it is set, as we currently
2632 * don't support asynchronous execution of direct modifications.
2633 */
2634 if (fscan->scan.plan.async_capable)
2635 fscan->scan.plan.async_capable = false;
2636
2637 table_close(rel, NoLock);
2638 return true;
2639}
2640
2641/*
2642 * postgresBeginDirectModify
2643 * Prepare a direct foreign table modification
2644 */
2645static void
2647{
2648 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2649 EState *estate = node->ss.ps.state;
2650 PgFdwDirectModifyState *dmstate;
2651 Index rtindex;
2652 Oid userid;
2653 ForeignTable *table;
2655 int numParams;
2656
2657 /*
2658 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2659 */
2660 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2661 return;
2662
2663 /*
2664 * We'll save private state in node->fdw_state.
2665 */
2667 node->fdw_state = dmstate;
2668
2669 /*
2670 * Identify which user to do the remote access as. This should match what
2671 * ExecCheckPermissions() does.
2672 */
2673 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2674
2675 /* Get info about foreign table. */
2676 rtindex = node->resultRelInfo->ri_RangeTableIndex;
2677 if (fsplan->scan.scanrelid == 0)
2678 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2679 else
2680 dmstate->rel = node->ss.ss_currentRelation;
2681 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2682 user = GetUserMapping(userid, table->serverid);
2683
2684 /*
2685 * Get connection to the foreign server. Connection manager will
2686 * establish new connection if necessary.
2687 */
2688 dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2689
2690 /* Update the foreign-join-related fields. */
2691 if (fsplan->scan.scanrelid == 0)
2692 {
2693 /* Save info about foreign table. */
2694 dmstate->resultRel = dmstate->rel;
2695
2696 /*
2697 * Set dmstate->rel to NULL to teach get_returning_data() and
2698 * make_tuple_from_result_row() that columns fetched from the remote
2699 * server are described by fdw_scan_tlist of the foreign-scan plan
2700 * node, not the tuple descriptor for the target relation.
2701 */
2702 dmstate->rel = NULL;
2703 }
2704
2705 /* Initialize state variable */
2706 dmstate->num_tuples = -1; /* -1 means not set yet */
2707
2708 /* Get private info created by planner functions. */
2709 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2711 dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2713 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2715 dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2717
2718 /* Create context for per-tuple temp workspace. */
2719 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2720 "postgres_fdw temporary data",
2722
2723 /* Prepare for input conversion of RETURNING results. */
2724 if (dmstate->has_returning)
2725 {
2726 TupleDesc tupdesc;
2727
2728 if (fsplan->scan.scanrelid == 0)
2729 tupdesc = get_tupdesc_for_join_scan_tuples(node);
2730 else
2731 tupdesc = RelationGetDescr(dmstate->rel);
2732
2733 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2734
2735 /*
2736 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2737 * initialize a filter to extract an updated/deleted tuple from a scan
2738 * tuple.
2739 */
2740 if (fsplan->scan.scanrelid == 0)
2741 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2742 }
2743
2744 /*
2745 * Prepare for processing of parameters used in remote query, if any.
2746 */
2747 numParams = list_length(fsplan->fdw_exprs);
2748 dmstate->numParams = numParams;
2749 if (numParams > 0)
2751 fsplan->fdw_exprs,
2752 numParams,
2753 &dmstate->param_flinfo,
2754 &dmstate->param_exprs,
2755 &dmstate->param_values);
2756}
2757
2758/*
2759 * postgresIterateDirectModify
2760 * Execute a direct foreign table modification
2761 */
2762static TupleTableSlot *
2764{
2766 EState *estate = node->ss.ps.state;
2767 ResultRelInfo *resultRelInfo = node->resultRelInfo;
2768
2769 /*
2770 * If this is the first call after Begin, execute the statement.
2771 */
2772 if (dmstate->num_tuples == -1)
2773 execute_dml_stmt(node);
2774
2775 /*
2776 * If the local query doesn't specify RETURNING, just clear tuple slot.
2777 */
2778 if (!resultRelInfo->ri_projectReturning)
2779 {
2780 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2781 Instrumentation *instr = node->ss.ps.instrument;
2782
2783 Assert(!dmstate->has_returning);
2784
2785 /* Increment the command es_processed count if necessary. */
2786 if (dmstate->set_processed)
2787 estate->es_processed += dmstate->num_tuples;
2788
2789 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2790 if (instr)
2791 instr->tuplecount += dmstate->num_tuples;
2792
2793 return ExecClearTuple(slot);
2794 }
2795
2796 /*
2797 * Get the next RETURNING tuple.
2798 */
2799 return get_returning_data(node);
2800}
2801
2802/*
2803 * postgresEndDirectModify
2804 * Finish a direct foreign table modification
2805 */
2806static void
2808{
2810
2811 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2812 if (dmstate == NULL)
2813 return;
2814
2815 /* Release PGresult */
2816 PQclear(dmstate->result);
2817
2818 /* Release remote connection */
2819 ReleaseConnection(dmstate->conn);
2820 dmstate->conn = NULL;
2821
2822 /* MemoryContext will be deleted automatically. */
2823}
2824
2825/*
2826 * postgresExplainForeignScan
2827 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2828 */
2829static void
2831{
2833 List *fdw_private = plan->fdw_private;
2834
2835 /*
2836 * Identify foreign scans that are really joins or upper relations. The
2837 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2838 * digit string(s), which are RT indexes, with the correct relation names.
2839 * We do that here, not when the plan is created, because we can't know
2840 * what aliases ruleutils.c will assign at plan creation time.
2841 */
2842 if (list_length(fdw_private) > FdwScanPrivateRelations)
2843 {
2844 StringInfo relations;
2845 char *rawrelations;
2846 char *ptr;
2847 int minrti,
2848 rtoffset;
2849
2850 rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2851
2852 /*
2853 * A difficulty with using a string representation of RT indexes is
2854 * that setrefs.c won't update the string when flattening the
2855 * rangetable. To find out what rtoffset was applied, identify the
2856 * minimum RT index appearing in the string and compare it to the
2857 * minimum member of plan->fs_base_relids. (We expect all the relids
2858 * in the join will have been offset by the same amount; the Asserts
2859 * below should catch it if that ever changes.)
2860 */
2861 minrti = INT_MAX;
2862 ptr = rawrelations;
2863 while (*ptr)
2864 {
2865 if (isdigit((unsigned char) *ptr))
2866 {
2867 int rti = strtol(ptr, &ptr, 10);
2868
2869 if (rti < minrti)
2870 minrti = rti;
2871 }
2872 else
2873 ptr++;
2874 }
2875 rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti;
2876
2877 /* Now we can translate the string */
2878 relations = makeStringInfo();
2879 ptr = rawrelations;
2880 while (*ptr)
2881 {
2882 if (isdigit((unsigned char) *ptr))
2883 {
2884 int rti = strtol(ptr, &ptr, 10);
2885 RangeTblEntry *rte;
2886 char *relname;
2887 char *refname;
2888
2889 rti += rtoffset;
2890 Assert(bms_is_member(rti, plan->fs_base_relids));
2891 rte = rt_fetch(rti, es->rtable);
2892 Assert(rte->rtekind == RTE_RELATION);
2893 /* This logic should agree with explain.c's ExplainTargetRel */
2894 relname = get_rel_name(rte->relid);
2895 if (es->verbose)
2896 {
2897 char *namespace;
2898
2899 namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid));
2900 appendStringInfo(relations, "%s.%s",
2901 quote_identifier(namespace),
2903 }
2904 else
2905 appendStringInfoString(relations,
2907 refname = (char *) list_nth(es->rtable_names, rti - 1);
2908 if (refname == NULL)
2909 refname = rte->eref->aliasname;
2910 if (strcmp(refname, relname) != 0)
2911 appendStringInfo(relations, " %s",
2912 quote_identifier(refname));
2913 }
2914 else
2915 appendStringInfoChar(relations, *ptr++);
2916 }
2917 ExplainPropertyText("Relations", relations->data, es);
2918 }
2919
2920 /*
2921 * Add remote query, when VERBOSE option is specified.
2922 */
2923 if (es->verbose)
2924 {
2925 char *sql;
2926
2927 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2928 ExplainPropertyText("Remote SQL", sql, es);
2929 }
2930}
2931
2932/*
2933 * postgresExplainForeignModify
2934 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2935 */
2936static void
2938 ResultRelInfo *rinfo,
2939 List *fdw_private,
2940 int subplan_index,
2941 ExplainState *es)
2942{
2943 if (es->verbose)
2944 {
2945 char *sql = strVal(list_nth(fdw_private,
2947
2948 ExplainPropertyText("Remote SQL", sql, es);
2949
2950 /*
2951 * For INSERT we should always have batch size >= 1, but UPDATE and
2952 * DELETE don't support batching so don't show the property.
2953 */
2954 if (rinfo->ri_BatchSize > 0)
2955 ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2956 }
2957}
2958
2959/*
2960 * postgresExplainDirectModify
2961 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2962 * foreign table directly
2963 */
2964static void
2966{
2967 List *fdw_private;
2968 char *sql;
2969
2970 if (es->verbose)
2971 {
2972 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2973 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2974 ExplainPropertyText("Remote SQL", sql, es);
2975 }
2976}
2977
2978/*
2979 * postgresExecForeignTruncate
2980 * Truncate one or more foreign tables
2981 */
2982static void
2984 DropBehavior behavior,
2985 bool restart_seqs)
2986{
2987 Oid serverid = InvalidOid;
2988 UserMapping *user = NULL;
2989 PGconn *conn = NULL;
2990 StringInfoData sql;
2991 ListCell *lc;
2992 bool server_truncatable = true;
2993
2994 /*
2995 * By default, all postgres_fdw foreign tables are assumed truncatable.
2996 * This can be overridden by a per-server setting, which in turn can be
2997 * overridden by a per-table setting.
2998 */
2999 foreach(lc, rels)
3000 {
3001 ForeignServer *server = NULL;
3002 Relation rel = lfirst(lc);
3004 ListCell *cell;
3005 bool truncatable;
3006
3007 /*
3008 * First time through, determine whether the foreign server allows
3009 * truncates. Since all specified foreign tables are assumed to belong
3010 * to the same foreign server, this result can be used for other
3011 * foreign tables.
3012 */
3013 if (!OidIsValid(serverid))
3014 {
3015 serverid = table->serverid;
3016 server = GetForeignServer(serverid);
3017
3018 foreach(cell, server->options)
3019 {
3020 DefElem *defel = (DefElem *) lfirst(cell);
3021
3022 if (strcmp(defel->defname, "truncatable") == 0)
3023 {
3024 server_truncatable = defGetBoolean(defel);
3025 break;
3026 }
3027 }
3028 }
3029
3030 /*
3031 * Confirm that all specified foreign tables belong to the same
3032 * foreign server.
3033 */
3034 Assert(table->serverid == serverid);
3035
3036 /* Determine whether this foreign table allows truncations */
3037 truncatable = server_truncatable;
3038 foreach(cell, table->options)
3039 {
3040 DefElem *defel = (DefElem *) lfirst(cell);
3041
3042 if (strcmp(defel->defname, "truncatable") == 0)
3043 {
3044 truncatable = defGetBoolean(defel);
3045 break;
3046 }
3047 }
3048
3049 if (!truncatable)
3050 ereport(ERROR,
3051 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3052 errmsg("foreign table \"%s\" does not allow truncates",
3054 }
3055 Assert(OidIsValid(serverid));
3056
3057 /*
3058 * Get connection to the foreign server. Connection manager will
3059 * establish new connection if necessary.
3060 */
3061 user = GetUserMapping(GetUserId(), serverid);
3062 conn = GetConnection(user, false, NULL);
3063
3064 /* Construct the TRUNCATE command string */
3065 initStringInfo(&sql);
3066 deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3067
3068 /* Issue the TRUNCATE command to remote server */
3069 do_sql_command(conn, sql.data);
3070
3071 pfree(sql.data);
3072}
3073
3074/*
3075 * estimate_path_cost_size
3076 * Get cost and size estimates for a foreign scan on given foreign relation
3077 * either a base relation or a join between foreign relations or an upper
3078 * relation containing foreign relations.
3079 *
3080 * param_join_conds are the parameterization clauses with outer relations.
3081 * pathkeys specify the expected sort order if any for given path being costed.
3082 * fpextra specifies additional post-scan/join-processing steps such as the
3083 * final sort and the LIMIT restriction.
3084 *
3085 * The function returns the cost and size estimates in p_rows, p_width,
3086 * p_disabled_nodes, p_startup_cost and p_total_cost variables.
3087 */
3088static void
3090 RelOptInfo *foreignrel,
3091 List *param_join_conds,
3092 List *pathkeys,
3093 PgFdwPathExtraData *fpextra,
3094 double *p_rows, int *p_width,
3095 int *p_disabled_nodes,
3096 Cost *p_startup_cost, Cost *p_total_cost)
3097{
3098 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3099 double rows;
3100 double retrieved_rows;
3101 int width;
3102 int disabled_nodes = 0;
3103 Cost startup_cost;
3104 Cost total_cost;
3105
3106 /* Make sure the core code has set up the relation's reltarget */
3107 Assert(foreignrel->reltarget);
3108
3109 /*
3110 * If the table or the server is configured to use remote estimates,
3111 * connect to the foreign server and execute EXPLAIN to estimate the
3112 * number of rows selected by the restriction+join clauses. Otherwise,
3113 * estimate rows using whatever statistics we have locally, in a way
3114 * similar to ordinary tables.
3115 */
3116 if (fpinfo->use_remote_estimate)
3117 {
3118 List *remote_param_join_conds;
3119 List *local_param_join_conds;
3120 StringInfoData sql;
3121 PGconn *conn;
3122 Selectivity local_sel;
3123 QualCost local_cost;
3124 List *fdw_scan_tlist = NIL;
3125 List *remote_conds;
3126
3127 /* Required only to be passed to deparseSelectStmtForRel */
3128 List *retrieved_attrs;
3129
3130 /*
3131 * param_join_conds might contain both clauses that are safe to send
3132 * across, and clauses that aren't.
3133 */
3134 classifyConditions(root, foreignrel, param_join_conds,
3135 &remote_param_join_conds, &local_param_join_conds);
3136
3137 /* Build the list of columns to be fetched from the foreign server. */
3138 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3139 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3140 else
3141 fdw_scan_tlist = NIL;
3142
3143 /*
3144 * The complete list of remote conditions includes everything from
3145 * baserestrictinfo plus any extra join_conds relevant to this
3146 * particular path.
3147 */
3148 remote_conds = list_concat(remote_param_join_conds,
3149 fpinfo->remote_conds);
3150
3151 /*
3152 * Construct EXPLAIN query including the desired SELECT, FROM, and
3153 * WHERE clauses. Params and other-relation Vars are replaced by dummy
3154 * values, so don't request params_list.
3155 */
3156 initStringInfo(&sql);
3157 appendStringInfoString(&sql, "EXPLAIN ");
3158 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3159 remote_conds, pathkeys,
3160 fpextra ? fpextra->has_final_sort : false,
3161 fpextra ? fpextra->has_limit : false,
3162 false, &retrieved_attrs, NULL);
3163
3164 /* Get the remote estimate */
3165 conn = GetConnection(fpinfo->user, false, NULL);
3166 get_remote_estimate(sql.data, conn, &rows, &width,
3167 &startup_cost, &total_cost);
3169
3170 retrieved_rows = rows;
3171
3172 /* Factor in the selectivity of the locally-checked quals */
3173 local_sel = clauselist_selectivity(root,
3174 local_param_join_conds,
3175 foreignrel->relid,
3176 JOIN_INNER,
3177 NULL);
3178 local_sel *= fpinfo->local_conds_sel;
3179
3180 rows = clamp_row_est(rows * local_sel);
3181
3182 /* Add in the eval cost of the locally-checked quals */
3183 startup_cost += fpinfo->local_conds_cost.startup;
3184 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3185 cost_qual_eval(&local_cost, local_param_join_conds, root);
3186 startup_cost += local_cost.startup;
3187 total_cost += local_cost.per_tuple * retrieved_rows;
3188
3189 /*
3190 * Add in tlist eval cost for each output row. In case of an
3191 * aggregate, some of the tlist expressions such as grouping
3192 * expressions will be evaluated remotely, so adjust the costs.
3193 */
3194 startup_cost += foreignrel->reltarget->cost.startup;
3195 total_cost += foreignrel->reltarget->cost.startup;
3196 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3197 if (IS_UPPER_REL(foreignrel))
3198 {
3199 QualCost tlist_cost;
3200
3201 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3202 startup_cost -= tlist_cost.startup;
3203 total_cost -= tlist_cost.startup;
3204 total_cost -= tlist_cost.per_tuple * rows;
3205 }
3206 }
3207 else
3208 {
3209 Cost run_cost = 0;
3210
3211 /*
3212 * We don't support join conditions in this mode (hence, no
3213 * parameterized paths can be made).
3214 */
3215 Assert(param_join_conds == NIL);
3216
3217 /*
3218 * We will come here again and again with different set of pathkeys or
3219 * additional post-scan/join-processing steps that caller wants to
3220 * cost. We don't need to calculate the cost/size estimates for the
3221 * underlying scan, join, or grouping each time. Instead, use those
3222 * estimates if we have cached them already.
3223 */
3224 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3225 {
3226 Assert(fpinfo->retrieved_rows >= 0);
3227
3228 rows = fpinfo->rows;
3229 retrieved_rows = fpinfo->retrieved_rows;
3230 width = fpinfo->width;
3231 startup_cost = fpinfo->rel_startup_cost;
3232 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3233
3234 /*
3235 * If we estimate the costs of a foreign scan or a foreign join
3236 * with additional post-scan/join-processing steps, the scan or
3237 * join costs obtained from the cache wouldn't yet contain the
3238 * eval costs for the final scan/join target, which would've been
3239 * updated by apply_scanjoin_target_to_paths(); add the eval costs
3240 * now.
3241 */
3242 if (fpextra && !IS_UPPER_REL(foreignrel))
3243 {
3244 /* Shouldn't get here unless we have LIMIT */
3245 Assert(fpextra->has_limit);
3246 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3247 foreignrel->reloptkind == RELOPT_JOINREL);
3248 startup_cost += foreignrel->reltarget->cost.startup;
3249 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3250 }
3251 }
3252 else if (IS_JOIN_REL(foreignrel))
3253 {
3254 PgFdwRelationInfo *fpinfo_i;
3255 PgFdwRelationInfo *fpinfo_o;
3256 QualCost join_cost;
3257 QualCost remote_conds_cost;
3258 double nrows;
3259
3260 /* Use rows/width estimates made by the core code. */
3261 rows = foreignrel->rows;
3262 width = foreignrel->reltarget->width;
3263
3264 /* For join we expect inner and outer relations set */
3265 Assert(fpinfo->innerrel && fpinfo->outerrel);
3266
3267 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3268 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3269
3270 /* Estimate of number of rows in cross product */
3271 nrows = fpinfo_i->rows * fpinfo_o->rows;
3272
3273 /*
3274 * Back into an estimate of the number of retrieved rows. Just in
3275 * case this is nuts, clamp to at most nrows.
3276 */
3277 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3278 retrieved_rows = Min(retrieved_rows, nrows);
3279
3280 /*
3281 * The cost of foreign join is estimated as cost of generating
3282 * rows for the joining relations + cost for applying quals on the
3283 * rows.
3284 */
3285
3286 /*
3287 * Calculate the cost of clauses pushed down to the foreign server
3288 */
3289 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3290 /* Calculate the cost of applying join clauses */
3291 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3292
3293 /*
3294 * Startup cost includes startup cost of joining relations and the
3295 * startup cost for join and other clauses. We do not include the
3296 * startup cost specific to join strategy (e.g. setting up hash
3297 * tables) since we do not know what strategy the foreign server
3298 * is going to use.
3299 */
3300 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3301 startup_cost += join_cost.startup;
3302 startup_cost += remote_conds_cost.startup;
3303 startup_cost += fpinfo->local_conds_cost.startup;
3304
3305 /*
3306 * Run time cost includes:
3307 *
3308 * 1. Run time cost (total_cost - startup_cost) of relations being
3309 * joined
3310 *
3311 * 2. Run time cost of applying join clauses on the cross product
3312 * of the joining relations.
3313 *
3314 * 3. Run time cost of applying pushed down other clauses on the
3315 * result of join
3316 *
3317 * 4. Run time cost of applying nonpushable other clauses locally
3318 * on the result fetched from the foreign server.
3319 */
3320 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3321 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3322 run_cost += nrows * join_cost.per_tuple;
3323 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3324 run_cost += nrows * remote_conds_cost.per_tuple;
3325 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3326
3327 /* Add in tlist eval cost for each output row */
3328 startup_cost += foreignrel->reltarget->cost.startup;
3329 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3330 }
3331 else if (IS_UPPER_REL(foreignrel))
3332 {
3333 RelOptInfo *outerrel = fpinfo->outerrel;
3334 PgFdwRelationInfo *ofpinfo;
3335 AggClauseCosts aggcosts;
3336 double input_rows;
3337 int numGroupCols;
3338 double numGroups = 1;
3339
3340 /* The upper relation should have its outer relation set */
3341 Assert(outerrel);
3342 /* and that outer relation should have its reltarget set */
3343 Assert(outerrel->reltarget);
3344
3345 /*
3346 * This cost model is mixture of costing done for sorted and
3347 * hashed aggregates in cost_agg(). We are not sure which
3348 * strategy will be considered at remote side, thus for
3349 * simplicity, we put all startup related costs in startup_cost
3350 * and all finalization and run cost are added in total_cost.
3351 */
3352
3353 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3354
3355 /* Get rows from input rel */
3356 input_rows = ofpinfo->rows;
3357
3358 /* Collect statistics about aggregates for estimating costs. */
3359 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3360 if (root->parse->hasAggs)
3361 {
3363 }
3364
3365 /* Get number of grouping columns and possible number of groups */
3366 numGroupCols = list_length(root->processed_groupClause);
3367 numGroups = estimate_num_groups(root,
3368 get_sortgrouplist_exprs(root->processed_groupClause,
3369 fpinfo->grouped_tlist),
3370 input_rows, NULL, NULL);
3371
3372 /*
3373 * Get the retrieved_rows and rows estimates. If there are HAVING
3374 * quals, account for their selectivity.
3375 */
3376 if (root->hasHavingQual)
3377 {
3378 /* Factor in the selectivity of the remotely-checked quals */
3379 retrieved_rows =
3380 clamp_row_est(numGroups *
3382 fpinfo->remote_conds,
3383 0,
3384 JOIN_INNER,
3385 NULL));
3386 /* Factor in the selectivity of the locally-checked quals */
3387 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3388 }
3389 else
3390 {
3391 rows = retrieved_rows = numGroups;
3392 }
3393
3394 /* Use width estimate made by the core code. */
3395 width = foreignrel->reltarget->width;
3396
3397 /*-----
3398 * Startup cost includes:
3399 * 1. Startup cost for underneath input relation, adjusted for
3400 * tlist replacement by apply_scanjoin_target_to_paths()
3401 * 2. Cost of performing aggregation, per cost_agg()
3402 *-----
3403 */
3404 startup_cost = ofpinfo->rel_startup_cost;
3405 startup_cost += outerrel->reltarget->cost.startup;
3406 startup_cost += aggcosts.transCost.startup;
3407 startup_cost += aggcosts.transCost.per_tuple * input_rows;
3408 startup_cost += aggcosts.finalCost.startup;
3409 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3410
3411 /*-----
3412 * Run time cost includes:
3413 * 1. Run time cost of underneath input relation, adjusted for
3414 * tlist replacement by apply_scanjoin_target_to_paths()
3415 * 2. Run time cost of performing aggregation, per cost_agg()
3416 *-----
3417 */
3418 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3419 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3420 run_cost += aggcosts.finalCost.per_tuple * numGroups;
3421 run_cost += cpu_tuple_cost * numGroups;
3422
3423 /* Account for the eval cost of HAVING quals, if any */
3424 if (root->hasHavingQual)
3425 {
3426 QualCost remote_cost;
3427
3428 /* Add in the eval cost of the remotely-checked quals */
3429 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3430 startup_cost += remote_cost.startup;
3431 run_cost += remote_cost.per_tuple * numGroups;
3432 /* Add in the eval cost of the locally-checked quals */
3433 startup_cost += fpinfo->local_conds_cost.startup;
3434 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3435 }
3436
3437 /* Add in tlist eval cost for each output row */
3438 startup_cost += foreignrel->reltarget->cost.startup;
3439 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3440 }
3441 else
3442 {
3443 Cost cpu_per_tuple;
3444
3445 /* Use rows/width estimates made by set_baserel_size_estimates. */
3446 rows = foreignrel->rows;
3447 width = foreignrel->reltarget->width;
3448
3449 /*
3450 * Back into an estimate of the number of retrieved rows. Just in
3451 * case this is nuts, clamp to at most foreignrel->tuples.
3452 */
3453 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3454 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3455
3456 /*
3457 * Cost as though this were a seqscan, which is pessimistic. We
3458 * effectively imagine the local_conds are being evaluated
3459 * remotely, too.
3460 */
3461 startup_cost = 0;
3462 run_cost = 0;
3463 run_cost += seq_page_cost * foreignrel->pages;
3464
3465 startup_cost += foreignrel->baserestrictcost.startup;
3466 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3467 run_cost += cpu_per_tuple * foreignrel->tuples;
3468
3469 /* Add in tlist eval cost for each output row */
3470 startup_cost += foreignrel->reltarget->cost.startup;
3471 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3472 }
3473
3474 /*
3475 * Without remote estimates, we have no real way to estimate the cost
3476 * of generating sorted output. It could be free if the query plan
3477 * the remote side would have chosen generates properly-sorted output
3478 * anyway, but in most cases it will cost something. Estimate a value
3479 * high enough that we won't pick the sorted path when the ordering
3480 * isn't locally useful, but low enough that we'll err on the side of
3481 * pushing down the ORDER BY clause when it's useful to do so.
3482 */
3483 if (pathkeys != NIL)
3484 {
3485 if (IS_UPPER_REL(foreignrel))
3486 {
3487 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3488 fpinfo->stage == UPPERREL_GROUP_AGG);
3490 retrieved_rows, width,
3491 fpextra->limit_tuples,
3492 &disabled_nodes,
3493 &startup_cost, &run_cost);
3494 }
3495 else
3496 {
3497 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3498 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3499 }
3500 }
3501
3502 total_cost = startup_cost + run_cost;
3503
3504 /* Adjust the cost estimates if we have LIMIT */
3505 if (fpextra && fpextra->has_limit)
3506 {
3507 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3508 fpextra->offset_est, fpextra->count_est);
3509 retrieved_rows = rows;
3510 }
3511 }
3512
3513 /*
3514 * If this includes the final sort step, the given target, which will be
3515 * applied to the resulting path, might have different expressions from
3516 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3517 * eval costs.
3518 */
3519 if (fpextra && fpextra->has_final_sort &&
3520 fpextra->target != foreignrel->reltarget)
3521 {
3522 QualCost oldcost = foreignrel->reltarget->cost;
3523 QualCost newcost = fpextra->target->cost;
3524
3525 startup_cost += newcost.startup - oldcost.startup;
3526 total_cost += newcost.startup - oldcost.startup;
3527 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3528 }
3529
3530 /*
3531 * Cache the retrieved rows and cost estimates for scans, joins, or
3532 * groupings without any parameterization, pathkeys, or additional
3533 * post-scan/join-processing steps, before adding the costs for
3534 * transferring data from the foreign server. These estimates are useful
3535 * for costing remote joins involving this relation or costing other
3536 * remote operations on this relation such as remote sorts and remote
3537 * LIMIT restrictions, when the costs can not be obtained from the foreign
3538 * server. This function will be called at least once for every foreign
3539 * relation without any parameterization, pathkeys, or additional
3540 * post-scan/join-processing steps.
3541 */
3542 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3543 {
3544 fpinfo->retrieved_rows = retrieved_rows;
3545 fpinfo->rel_startup_cost = startup_cost;
3546 fpinfo->rel_total_cost = total_cost;
3547 }
3548
3549 /*
3550 * Add some additional cost factors to account for connection overhead
3551 * (fdw_startup_cost), transferring data across the network
3552 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3553 * (cpu_tuple_cost per retrieved row).
3554 */
3555 startup_cost += fpinfo->fdw_startup_cost;
3556 total_cost += fpinfo->fdw_startup_cost;
3557 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3558 total_cost += cpu_tuple_cost * retrieved_rows;
3559
3560 /*
3561 * If we have LIMIT, we should prefer performing the restriction remotely
3562 * rather than locally, as the former avoids extra row fetches from the
3563 * remote that the latter might cause. But since the core code doesn't
3564 * account for such fetches when estimating the costs of the local
3565 * restriction (see create_limit_path()), there would be no difference
3566 * between the costs of the local restriction and the costs of the remote
3567 * restriction estimated above if we don't use remote estimates (except
3568 * for the case where the foreignrel is a grouping relation, the given
3569 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3570 * accounted for in costing the remote restriction). Tweak the costs of
3571 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3572 * one.
3573 */
3574 if (!fpinfo->use_remote_estimate &&
3575 fpextra && fpextra->has_limit &&
3576 fpextra->limit_tuples > 0 &&
3577 fpextra->limit_tuples < fpinfo->rows)
3578 {
3579 Assert(fpinfo->rows > 0);
3580 total_cost -= (total_cost - startup_cost) * 0.05 *
3581 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3582 }
3583
3584 /* Return results. */
3585 *p_rows = rows;
3586 *p_width = width;
3587 *p_disabled_nodes = disabled_nodes;
3588 *p_startup_cost = startup_cost;
3589 *p_total_cost = total_cost;
3590}
3591
3592/*
3593 * Estimate costs of executing a SQL statement remotely.
3594 * The given "sql" must be an EXPLAIN command.
3595 */
3596static void
3598 double *rows, int *width,
3599 Cost *startup_cost, Cost *total_cost)
3600{
3601 PGresult *volatile res = NULL;
3602
3603 /* PGresult must be released before leaving this function. */
3604 PG_TRY();
3605 {
3606 char *line;
3607 char *p;
3608 int n;
3609
3610 /*
3611 * Execute EXPLAIN remotely.
3612 */
3613 res = pgfdw_exec_query(conn, sql, NULL);
3615 pgfdw_report_error(ERROR, res, conn, false, sql);
3616
3617 /*
3618 * Extract cost numbers for topmost plan node. Note we search for a
3619 * left paren from the end of the line to avoid being confused by
3620 * other uses of parentheses.
3621 */
3622 line = PQgetvalue(res, 0, 0);
3623 p = strrchr(line, '(');
3624 if (p == NULL)
3625 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3626 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3627 startup_cost, total_cost, rows, width);
3628 if (n != 4)
3629 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3630 }
3631 PG_FINALLY();
3632 {
3633 PQclear(res);
3634 }
3635 PG_END_TRY();
3636}
3637
3638/*
3639 * Adjust the cost estimates of a foreign grouping path to include the cost of
3640 * generating properly-sorted output.
3641 */
3642static void
3644 List *pathkeys,
3645 double retrieved_rows,
3646 double width,
3647 double limit_tuples,
3648 int *p_disabled_nodes,
3649 Cost *p_startup_cost,
3650 Cost *p_run_cost)
3651{
3652 /*
3653 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3654 * side is unlikely to generate properly-sorted output, so it would need
3655 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3656 * if the GROUP BY clause is sort-able but isn't a superset of the given
3657 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3658 * costs by applying the same heuristic as for the scan or join case.
3659 */
3660 if (!grouping_is_sortable(root->processed_groupClause) ||
3661 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3662 {
3663 Path sort_path; /* dummy for result of cost_sort */
3664
3665 cost_sort(&sort_path,
3666 root,
3667 pathkeys,
3668 0,
3669 *p_startup_cost + *p_run_cost,
3670 retrieved_rows,
3671 width,
3672 0.0,
3673 work_mem,
3674 limit_tuples);
3675
3676 *p_startup_cost = sort_path.startup_cost;
3677 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3678 }
3679 else
3680 {
3681 /*
3682 * The default extra cost seems too large for foreign-grouping cases;
3683 * add 1/4th of that default.
3684 */
3685 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3686 - 1.0) * 0.25;
3687
3688 *p_startup_cost *= sort_multiplier;
3689 *p_run_cost *= sort_multiplier;
3690 }
3691}
3692
3693/*
3694 * Detect whether we want to process an EquivalenceClass member.
3695 *
3696 * This is a callback for use by generate_implied_equalities_for_column.
3697 */
3698static bool
3701 void *arg)
3702{
3704 Expr *expr = em->em_expr;
3705
3706 /*
3707 * If we've identified what we're processing in the current scan, we only
3708 * want to match that expression.
3709 */
3710 if (state->current != NULL)
3711 return equal(expr, state->current);
3712
3713 /*
3714 * Otherwise, ignore anything we've already processed.
3715 */
3716 if (list_member(state->already_used, expr))
3717 return false;
3718
3719 /* This is the new target to process. */
3720 state->current = expr;
3721 return true;
3722}
3723
3724/*
3725 * Create cursor for node's query with current parameter values.
3726 */
3727static void
3729{
3730 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3731 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3732 int numParams = fsstate->numParams;
3733 const char **values = fsstate->param_values;
3734 PGconn *conn = fsstate->conn;
3736 PGresult *res;
3737
3738 /* First, process a pending asynchronous request, if any. */
3739 if (fsstate->conn_state->pendingAreq)
3741
3742 /*
3743 * Construct array of query parameter values in text format. We do the
3744 * conversions in the short-lived per-tuple context, so as not to cause a
3745 * memory leak over repeated scans.
3746 */
3747 if (numParams > 0)
3748 {
3749 MemoryContext oldcontext;
3750
3751 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3752
3753 process_query_params(econtext,
3754 fsstate->param_flinfo,
3755 fsstate->param_exprs,
3756 values);
3757
3758 MemoryContextSwitchTo(oldcontext);
3759 }
3760
3761 /* Construct the DECLARE CURSOR command */
3763 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3764 fsstate->cursor_number, fsstate->query);
3765
3766 /*
3767 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3768 * to infer types for all parameters. Since we explicitly cast every
3769 * parameter (see deparse.c), the "inference" is trivial and will produce
3770 * the desired result. This allows us to avoid assuming that the remote
3771 * server has the same OIDs we do for the parameters' types.
3772 */
3773 if (!PQsendQueryParams(conn, buf.data, numParams,
3774 NULL, values, NULL, NULL, 0))
3775 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3776
3777 /*
3778 * Get the result, and check for success.
3779 *
3780 * We don't use a PG_TRY block here, so be careful not to throw error
3781 * without releasing the PGresult.
3782 */
3785 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3786 PQclear(res);
3787
3788 /* Mark the cursor as created, and show no tuples have been retrieved */
3789 fsstate->cursor_exists = true;
3790 fsstate->tuples = NULL;
3791 fsstate->num_tuples = 0;
3792 fsstate->next_tuple = 0;
3793 fsstate->fetch_ct_2 = 0;
3794 fsstate->eof_reached = false;
3795
3796 /* Clean up */
3797 pfree(buf.data);
3798}
3799
3800/*
3801 * Fetch some more rows from the node's cursor.
3802 */
3803static void
3805{
3806 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3807 PGresult *volatile res = NULL;
3808 MemoryContext oldcontext;
3809
3810 /*
3811 * We'll store the tuples in the batch_cxt. First, flush the previous
3812 * batch.
3813 */
3814 fsstate->tuples = NULL;
3815 MemoryContextReset(fsstate->batch_cxt);
3816 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3817
3818 /* PGresult must be released before leaving this function. */
3819 PG_TRY();
3820 {
3821 PGconn *conn = fsstate->conn;
3822 int numrows;
3823 int i;
3824
3825 if (fsstate->async_capable)
3826 {
3827 Assert(fsstate->conn_state->pendingAreq);
3828
3829 /*
3830 * The query was already sent by an earlier call to
3831 * fetch_more_data_begin. So now we just fetch the result.
3832 */
3834 /* On error, report the original query, not the FETCH. */
3836 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3837
3838 /* Reset per-connection state */
3839 fsstate->conn_state->pendingAreq = NULL;
3840 }
3841 else
3842 {
3843 char sql[64];
3844
3845 /* This is a regular synchronous fetch. */
3846 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3847 fsstate->fetch_size, fsstate->cursor_number);
3848
3849 res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3850 /* On error, report the original query, not the FETCH. */
3852 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3853 }
3854
3855 /* Convert the data into HeapTuples */
3856 numrows = PQntuples(res);
3857 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3858 fsstate->num_tuples = numrows;
3859 fsstate->next_tuple = 0;
3860
3861 for (i = 0; i < numrows; i++)
3862 {
3863 Assert(IsA(node->ss.ps.plan, ForeignScan));
3864
3865 fsstate->tuples[i] =
3867 fsstate->rel,
3868 fsstate->attinmeta,
3869 fsstate->retrieved_attrs,
3870 node,
3871 fsstate->temp_cxt);
3872 }
3873
3874 /* Update fetch_ct_2 */
3875 if (fsstate->fetch_ct_2 < 2)
3876 fsstate->fetch_ct_2++;
3877
3878 /* Must be EOF if we didn't get as many tuples as we asked for. */
3879 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3880 }
3881 PG_FINALLY();
3882 {
3883 PQclear(res);
3884 }
3885 PG_END_TRY();
3886
3887 MemoryContextSwitchTo(oldcontext);
3888}
3889
3890/*
3891 * Force assorted GUC parameters to settings that ensure that we'll output
3892 * data values in a form that is unambiguous to the remote server.
3893 *
3894 * This is rather expensive and annoying to do once per row, but there's
3895 * little choice if we want to be sure values are transmitted accurately;
3896 * we can't leave the settings in place between rows for fear of affecting
3897 * user-visible computations.
3898 *
3899 * We use the equivalent of a function SET option to allow the settings to
3900 * persist only until the caller calls reset_transmission_modes(). If an
3901 * error is thrown in between, guc.c will take care of undoing the settings.
3902 *
3903 * The return value is the nestlevel that must be passed to
3904 * reset_transmission_modes() to undo things.
3905 */
3906int
3908{
3909 int nestlevel = NewGUCNestLevel();
3910
3911 /*
3912 * The values set here should match what pg_dump does. See also
3913 * configure_remote_session in connection.c.
3914 */
3915 if (DateStyle != USE_ISO_DATES)
3916 (void) set_config_option("datestyle", "ISO",
3918 GUC_ACTION_SAVE, true, 0, false);
3920 (void) set_config_option("intervalstyle", "postgres",
3922 GUC_ACTION_SAVE, true, 0, false);
3923 if (extra_float_digits < 3)
3924 (void) set_config_option("extra_float_digits", "3",
3926 GUC_ACTION_SAVE, true, 0, false);
3927
3928 /*
3929 * In addition force restrictive search_path, in case there are any
3930 * regproc or similar constants to be printed.
3931 */
3932 (void) set_config_option("search_path", "pg_catalog",
3934 GUC_ACTION_SAVE, true, 0, false);
3935
3936 return nestlevel;
3937}
3938
3939/*
3940 * Undo the effects of set_transmission_modes().
3941 */
3942void
3944{
3945 AtEOXact_GUC(true, nestlevel);
3946}
3947
3948/*
3949 * Utility routine to close a cursor.
3950 */
3951static void
3953 PgFdwConnState *conn_state)
3954{
3955 char sql[64];
3956 PGresult *res;
3957
3958 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3959
3960 /*
3961 * We don't use a PG_TRY block here, so be careful not to throw error
3962 * without releasing the PGresult.
3963 */
3964 res = pgfdw_exec_query(conn, sql, conn_state);
3966 pgfdw_report_error(ERROR, res, conn, true, sql);
3967 PQclear(res);
3968}
3969
3970/*
3971 * create_foreign_modify
3972 * Construct an execution state of a foreign insert/update/delete
3973 * operation
3974 */
3975static PgFdwModifyState *
3977 RangeTblEntry *rte,
3978 ResultRelInfo *resultRelInfo,
3979 CmdType operation,
3980 Plan *subplan,
3981 char *query,
3982 List *target_attrs,
3983 int values_end,
3984 bool has_returning,
3985 List *retrieved_attrs)
3986{
3987 PgFdwModifyState *fmstate;
3988 Relation rel = resultRelInfo->ri_RelationDesc;
3989 TupleDesc tupdesc = RelationGetDescr(rel);
3990 Oid userid;
3991 ForeignTable *table;
3993 AttrNumber n_params;
3994 Oid typefnoid;
3995 bool isvarlena;
3996 ListCell *lc;
3997
3998 /* Begin constructing PgFdwModifyState. */
3999 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
4000 fmstate->rel = rel;
4001
4002 /* Identify which user to do the remote access as. */
4003 userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate);
4004
4005 /* Get info about foreign table. */
4006 table = GetForeignTable(RelationGetRelid(rel));
4007 user = GetUserMapping(userid, table->serverid);
4008
4009 /* Open connection; report that we'll create a prepared statement. */
4010 fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
4011 fmstate->p_name = NULL; /* prepared statement not made yet */
4012
4013 /* Set up remote query information. */
4014 fmstate->query = query;
4015 if (operation == CMD_INSERT)
4016 {
4017 fmstate->query = pstrdup(fmstate->query);
4018 fmstate->orig_query = pstrdup(fmstate->query);
4019 }
4020 fmstate->target_attrs = target_attrs;
4021 fmstate->values_end = values_end;
4022 fmstate->has_returning = has_returning;
4023 fmstate->retrieved_attrs = retrieved_attrs;
4024
4025 /* Create context for per-tuple temp workspace. */
4026 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4027 "postgres_fdw temporary data",
4029
4030 /* Prepare for input conversion of RETURNING results. */
4031 if (fmstate->has_returning)
4032 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4033
4034 /* Prepare for output conversion of parameters used in prepared stmt. */
4035 n_params = list_length(fmstate->target_attrs) + 1;
4036 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
4037 fmstate->p_nums = 0;
4038
4039 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4040 {
4041 Assert(subplan != NULL);
4042
4043 /* Find the ctid resjunk column in the subplan's result */
4045 "ctid");
4046 if (!AttributeNumberIsValid(fmstate->ctidAttno))
4047 elog(ERROR, "could not find junk ctid column");
4048
4049 /* First transmittable parameter will be ctid */
4050 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4051 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4052 fmstate->p_nums++;
4053 }
4054
4055 if (operation == CMD_INSERT || operation == CMD_UPDATE)
4056 {
4057 /* Set up for remaining transmittable parameters */
4058 foreach(lc, fmstate->target_attrs)
4059 {
4060 int attnum = lfirst_int(lc);
4061 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4062
4063 Assert(!attr->attisdropped);
4064
4065 /* Ignore generated columns; they are set to DEFAULT */
4066 if (attr->attgenerated)
4067 continue;
4068 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4069 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4070 fmstate->p_nums++;
4071 }
4072 }
4073
4074 Assert(fmstate->p_nums <= n_params);
4075
4076 /* Set batch_size from foreign server/table options. */
4077 if (operation == CMD_INSERT)
4078 fmstate->batch_size = get_batch_size_option(rel);
4079
4080 fmstate->num_slots = 1;
4081
4082 /* Initialize auxiliary state */
4083 fmstate->aux_fmstate = NULL;
4084
4085 return fmstate;
4086}
4087
4088/*
4089 * execute_foreign_modify
4090 * Perform foreign-table modification as required, and fetch RETURNING
4091 * result if any. (This is the shared guts of postgresExecForeignInsert,
4092 * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4093 * postgresExecForeignDelete.)
4094 */
4095static TupleTableSlot **
4097 ResultRelInfo *resultRelInfo,
4098 CmdType operation,
4099 TupleTableSlot **slots,
4100 TupleTableSlot **planSlots,
4101 int *numSlots)
4102{
4103 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4104 ItemPointer ctid = NULL;
4105 const char **p_values;
4106 PGresult *res;
4107 int n_rows;
4108 StringInfoData sql;
4109
4110 /* The operation should be INSERT, UPDATE, or DELETE */
4111 Assert(operation == CMD_INSERT ||
4112 operation == CMD_UPDATE ||
4113 operation == CMD_DELETE);
4114
4115 /* First, process a pending asynchronous request, if any. */
4116 if (fmstate->conn_state->pendingAreq)
4118
4119 /*
4120 * If the existing query was deparsed and prepared for a different number
4121 * of rows, rebuild it for the proper number.
4122 */
4123 if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4124 {
4125 /* Destroy the prepared statement created previously */
4126 if (fmstate->p_name)
4127 deallocate_query(fmstate);
4128
4129 /* Build INSERT string with numSlots records in its VALUES clause. */
4130 initStringInfo(&sql);
4131 rebuildInsertSql(&sql, fmstate->rel,
4132 fmstate->orig_query, fmstate->target_attrs,
4133 fmstate->values_end, fmstate->p_nums,
4134 *numSlots - 1);
4135 pfree(fmstate->query);
4136 fmstate->query = sql.data;
4137 fmstate->num_slots = *numSlots;
4138 }
4139
4140 /* Set up the prepared statement on the remote server, if we didn't yet */
4141 if (!fmstate->p_name)
4142 prepare_foreign_modify(fmstate);
4143
4144 /*
4145 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4146 */
4147 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4148 {
4149 Datum datum;
4150 bool isNull;
4151
4152 datum = ExecGetJunkAttribute(planSlots[0],
4153 fmstate->ctidAttno,
4154 &isNull);
4155 /* shouldn't ever get a null result... */
4156 if (isNull)
4157 elog(ERROR, "ctid is NULL");
4158 ctid = (ItemPointer) DatumGetPointer(datum);
4159 }
4160
4161 /* Convert parameters needed by prepared statement to text form */
4162 p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4163
4164 /*
4165 * Execute the prepared statement.
4166 */
4167 if (!PQsendQueryPrepared(fmstate->conn,
4168 fmstate->p_name,
4169 fmstate->p_nums * (*numSlots),
4170 p_values,
4171 NULL,
4172 NULL,
4173 0))
4174 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4175
4176 /*
4177 * Get the result, and check for success.
4178 *
4179 * We don't use a PG_TRY block here, so be careful not to throw error
4180 * without releasing the PGresult.
4181 */
4182 res = pgfdw_get_result(fmstate->conn);
4183 if (PQresultStatus(res) !=
4185 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4186
4187 /* Check number of rows affected, and fetch RETURNING tuple if any */
4188 if (fmstate->has_returning)
4189 {
4190 Assert(*numSlots == 1);
4191 n_rows = PQntuples(res);
4192 if (n_rows > 0)
4193 store_returning_result(fmstate, slots[0], res);
4194 }
4195 else
4196 n_rows = atoi(PQcmdTuples(res));
4197
4198 /* And clean up */
4199 PQclear(res);
4200
4201 MemoryContextReset(fmstate->temp_cxt);
4202
4203 *numSlots = n_rows;
4204
4205 /*
4206 * Return NULL if nothing was inserted/updated/deleted on the remote end
4207 */
4208 return (n_rows > 0) ? slots : NULL;
4209}
4210
4211/*
4212 * prepare_foreign_modify
4213 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4214 */
4215static void
4217{
4218 char prep_name[NAMEDATALEN];
4219 char *p_name;
4220 PGresult *res;
4221
4222 /*
4223 * The caller would already have processed a pending asynchronous request
4224 * if any, so no need to do it here.
4225 */
4226
4227 /* Construct name we'll use for the prepared statement. */
4228 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4229 GetPrepStmtNumber(fmstate->conn));
4230 p_name = pstrdup(prep_name);
4231
4232 /*
4233 * We intentionally do not specify parameter types here, but leave the
4234 * remote server to derive them by default. This avoids possible problems
4235 * with the remote server using different type OIDs than we do. All of
4236 * the prepared statements we use in this module are simple enough that
4237 * the remote server will make the right choices.
4238 */
4239 if (!PQsendPrepare(fmstate->conn,
4240 p_name,
4241 fmstate->query,
4242 0,
4243 NULL))
4244 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4245
4246 /*
4247 * Get the result, and check for success.
4248 *
4249 * We don't use a PG_TRY block here, so be careful not to throw error
4250 * without releasing the PGresult.
4251 */
4252 res = pgfdw_get_result(fmstate->conn);
4254 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4255 PQclear(res);
4256
4257 /* This action shows that the prepare has been done. */
4258 fmstate->p_name = p_name;
4259}
4260
4261/*
4262 * convert_prep_stmt_params
4263 * Create array of text strings representing parameter values
4264 *
4265 * tupleid is ctid to send, or NULL if none
4266 * slot is slot to get remaining parameters from, or NULL if none
4267 *
4268 * Data is constructed in temp_cxt; caller should reset that after use.
4269 */
4270static const char **
4272 ItemPointer tupleid,
4273 TupleTableSlot **slots,
4274 int numSlots)
4275{
4276 const char **p_values;
4277 int i;
4278 int j;
4279 int pindex = 0;
4280 MemoryContext oldcontext;
4281
4282 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4283
4284 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4285
4286 /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4287 Assert(!(tupleid != NULL && numSlots > 1));
4288
4289 /* 1st parameter should be ctid, if it's in use */
4290 if (tupleid != NULL)
4291 {
4292 Assert(numSlots == 1);
4293 /* don't need set_transmission_modes for TID output */
4294 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4295 PointerGetDatum(tupleid));
4296 pindex++;
4297 }
4298
4299 /* get following parameters from slots */
4300 if (slots != NULL && fmstate->target_attrs != NIL)
4301 {
4302 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4303 int nestlevel;
4304 ListCell *lc;
4305
4306 nestlevel = set_transmission_modes();
4307
4308 for (i = 0; i < numSlots; i++)
4309 {
4310 j = (tupleid != NULL) ? 1 : 0;
4311 foreach(lc, fmstate->target_attrs)
4312 {
4313 int attnum = lfirst_int(lc);
4314 CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
4315 Datum value;
4316 bool isnull;
4317
4318 /* Ignore generated columns; they are set to DEFAULT */
4319 if (attr->attgenerated)
4320 continue;
4321 value = slot_getattr(slots[i], attnum, &isnull);
4322 if (isnull)
4323 p_values[pindex] = NULL;
4324 else
4325 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4326 value);
4327 pindex++;
4328 j++;
4329 }
4330 }
4331
4332 reset_transmission_modes(nestlevel);
4333 }
4334
4335 Assert(pindex == fmstate->p_nums * numSlots);
4336
4337 MemoryContextSwitchTo(oldcontext);
4338
4339 return p_values;
4340}
4341
4342/*
4343 * store_returning_result
4344 * Store the result of a RETURNING clause
4345 *
4346 * On error, be sure to release the PGresult on the way out. Callers do not
4347 * have PG_TRY blocks to ensure this happens.
4348 */
4349static void
4351 TupleTableSlot *slot, PGresult *res)
4352{
4353 PG_TRY();
4354 {
4355 HeapTuple newtup;
4356
4357 newtup = make_tuple_from_result_row(res, 0,
4358 fmstate->rel,
4359 fmstate->attinmeta,
4360 fmstate->retrieved_attrs,
4361 NULL,
4362 fmstate->temp_cxt);
4363
4364 /*
4365 * The returning slot will not necessarily be suitable to store
4366 * heaptuples directly, so allow for conversion.
4367 */
4368 ExecForceStoreHeapTuple(newtup, slot, true);
4369 }
4370 PG_CATCH();
4371 {
4372 PQclear(res);
4373 PG_RE_THROW();
4374 }
4375 PG_END_TRY();
4376}
4377
4378/*
4379 * finish_foreign_modify
4380 * Release resources for a foreign insert/update/delete operation
4381 */
4382static void
4384{
4385 Assert(fmstate != NULL);
4386
4387 /* If we created a prepared statement, destroy it */
4388 deallocate_query(fmstate);
4389
4390 /* Release remote connection */
4391 ReleaseConnection(fmstate->conn);
4392 fmstate->conn = NULL;
4393}
4394
4395/*
4396 * deallocate_query
4397 * Deallocate a prepared statement for a foreign insert/update/delete
4398 * operation
4399 */
4400static void
4402{
4403 char sql[64];
4404 PGresult *res;
4405
4406 /* do nothing if the query is not allocated */
4407 if (!fmstate->p_name)
4408 return;
4409
4410 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4411
4412 /*
4413 * We don't use a PG_TRY block here, so be careful not to throw error
4414 * without releasing the PGresult.
4415 */
4416 res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4418 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4419 PQclear(res);
4420 pfree(fmstate->p_name);
4421 fmstate->p_name = NULL;
4422}
4423
4424/*
4425 * build_remote_returning
4426 * Build a RETURNING targetlist of a remote query for performing an
4427 * UPDATE/DELETE .. RETURNING on a join directly
4428 */
4429static List *
4430build_remote_returning(Index rtindex, Relation rel, List *returningList)
4431{
4432 bool have_wholerow = false;
4433 List *tlist = NIL;
4434 List *vars;
4435 ListCell *lc;
4436
4437 Assert(returningList);
4438
4439 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4440
4441 /*
4442 * If there's a whole-row reference to the target relation, then we'll
4443 * need all the columns of the relation.
4444 */
4445 foreach(lc, vars)
4446 {
4447 Var *var = (Var *) lfirst(lc);
4448
4449 if (IsA(var, Var) &&
4450 var->varno == rtindex &&
4452 {
4453 have_wholerow = true;
4454 break;
4455 }
4456 }
4457
4458 if (have_wholerow)
4459 {
4460 TupleDesc tupdesc = RelationGetDescr(rel);
4461 int i;
4462
4463 for (i = 1; i <= tupdesc->natts; i++)
4464 {
4465 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4466 Var *var;
4467
4468 /* Ignore dropped attributes. */
4469 if (attr->attisdropped)
4470 continue;
4471
4472 var = makeVar(rtindex,
4473 i,
4474 attr->atttypid,
4475 attr->atttypmod,
4476 attr->attcollation,
4477 0);
4478
4479 tlist = lappend(tlist,
4480 makeTargetEntry((Expr *) var,
4481 list_length(tlist) + 1,
4482 NULL,
4483 false));
4484 }
4485 }
4486
4487 /* Now add any remaining columns to tlist. */
4488 foreach(lc, vars)
4489 {
4490 Var *var = (Var *) lfirst(lc);
4491
4492 /*
4493 * No need for whole-row references to the target relation. We don't
4494 * need system columns other than ctid and oid either, since those are
4495 * set locally.
4496 */
4497 if (IsA(var, Var) &&
4498 var->varno == rtindex &&
4499 var->varattno <= InvalidAttrNumber &&
4501 continue; /* don't need it */
4502
4503 if (tlist_member((Expr *) var, tlist))
4504 continue; /* already got it */
4505
4506 tlist = lappend(tlist,
4507 makeTargetEntry((Expr *) var,
4508 list_length(tlist) + 1,
4509 NULL,
4510 false));
4511 }
4512
4513 list_free(vars);
4514
4515 return tlist;
4516}
4517
4518/*
4519 * rebuild_fdw_scan_tlist
4520 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4521 * tlist
4522 *
4523 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4524 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4525 * have contained resjunk columns such as 'ctid' of the target relation and
4526 * 'wholerow' of non-target relations, but the tlist might not contain them,
4527 * for example. So, adjust the tlist so it contains all the columns specified
4528 * in the fdw_scan_tlist; else setrefs.c will get confused.
4529 */
4530static void
4532{
4533 List *new_tlist = tlist;
4534 List *old_tlist = fscan->fdw_scan_tlist;
4535 ListCell *lc;
4536
4537 foreach(lc, old_tlist)
4538 {
4539 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4540
4541 if (tlist_member(tle->expr, new_tlist))
4542 continue; /* already got it */
4543
4544 new_tlist = lappend(new_tlist,
4545 makeTargetEntry(tle->expr,
4546 list_length(new_tlist) + 1,
4547 NULL,
4548 false));
4549 }
4550 fscan->fdw_scan_tlist = new_tlist;
4551}
4552
4553/*
4554 * Execute a direct UPDATE/DELETE statement.
4555 */
4556static void
4558{
4560 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4561 int numParams = dmstate->numParams;
4562 const char **values = dmstate->param_values;
4563
4564 /* First, process a pending asynchronous request, if any. */
4565 if (dmstate->conn_state->pendingAreq)
4567
4568 /*
4569 * Construct array of query parameter values in text format.
4570 */
4571 if (numParams > 0)
4572 process_query_params(econtext,
4573 dmstate->param_flinfo,
4574 dmstate->param_exprs,
4575 values);
4576
4577 /*
4578 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4579 * to infer types for all parameters. Since we explicitly cast every
4580 * parameter (see deparse.c), the "inference" is trivial and will produce
4581 * the desired result. This allows us to avoid assuming that the remote
4582 * server has the same OIDs we do for the parameters' types.
4583 */
4584 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4585 NULL, values, NULL, NULL, 0))
4586 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4587
4588 /*
4589 * Get the result, and check for success.
4590 *
4591 * We don't use a PG_TRY block here, so be careful not to throw error
4592 * without releasing the PGresult.
4593 */
4594 dmstate->result = pgfdw_get_result(dmstate->conn);
4595 if (PQresultStatus(dmstate->result) !=
4597 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4598 dmstate->query);
4599
4600 /* Get the number of rows affected. */
4601 if (dmstate->has_returning)
4602 dmstate->num_tuples = PQntuples(dmstate->result);
4603 else
4604 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4605}
4606
4607/*
4608 * Get the result of a RETURNING clause.
4609 */
4610static TupleTableSlot *
4612{
4614 EState *estate = node->ss.ps.state;
4615 ResultRelInfo *resultRelInfo = node->resultRelInfo;
4616 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4617 TupleTableSlot *resultSlot;
4618
4619 Assert(resultRelInfo->ri_projectReturning);
4620
4621 /* If we didn't get any tuples, must be end of data. */
4622 if (dmstate->next_tuple >= dmstate->num_tuples)
4623 return ExecClearTuple(slot);
4624
4625 /* Increment the command es_processed count if necessary. */
4626 if (dmstate->set_processed)
4627 estate->es_processed += 1;
4628
4629 /*
4630 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4631 * tuple. (has_returning is false when the local query is of the form
4632 * "UPDATE/DELETE .. RETURNING 1" for example.)
4633 */
4634 if (!dmstate->has_returning)
4635 {
4637 resultSlot = slot;
4638 }
4639 else
4640 {
4641 /*
4642 * On error, be sure to release the PGresult on the way out. Callers
4643 * do not have PG_TRY blocks to ensure this happens.
4644 */
4645 PG_TRY();
4646 {
4647 HeapTuple newtup;
4648
4649 newtup = make_tuple_from_result_row(dmstate->result,
4650 dmstate->next_tuple,
4651 dmstate->rel,
4652 dmstate->attinmeta,
4653 dmstate->retrieved_attrs,
4654 node,
4655 dmstate->temp_cxt);
4656 ExecStoreHeapTuple(newtup, slot, false);
4657 }
4658 PG_CATCH();
4659 {
4660 PQclear(dmstate->result);
4661 PG_RE_THROW();
4662 }
4663 PG_END_TRY();
4664
4665 /* Get the updated/deleted tuple. */
4666 if (dmstate->rel)
4667 resultSlot = slot;
4668 else
4669 resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4670 }
4671 dmstate->next_tuple++;
4672
4673 /* Make slot available for evaluation of the local query RETURNING list. */
4674 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4675 resultSlot;
4676
4677 return slot;
4678}
4679
4680/*
4681 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4682 */
4683static void
4685 List *fdw_scan_tlist,
4686 Index rtindex)
4687{
4688 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4689 ListCell *lc;
4690 int i;
4691
4692 /*
4693 * Calculate the mapping between the fdw_scan_tlist's entries and the
4694 * result tuple's attributes.
4695 *
4696 * The "map" is an array of indexes of the result tuple's attributes in
4697 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4698 * tuple. We store zero for any attributes that don't have the
4699 * corresponding entries in that list, marking that a NULL is needed in
4700 * the result tuple.
4701 *
4702 * Also get the indexes of the entries for ctid and oid if any.
4703 */
4704 dmstate->attnoMap = (AttrNumber *)
4705 palloc0(resultTupType->natts * sizeof(AttrNumber));
4706
4707 dmstate->ctidAttno = dmstate->oidAttno = 0;
4708
4709 i = 1;
4710 dmstate->hasSystemCols = false;
4711 foreach(lc, fdw_scan_tlist)
4712 {
4713 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4714 Var *var = (Var *) tle->expr;
4715
4716 Assert(IsA(var, Var));
4717
4718 /*
4719 * If the Var is a column of the target relation to be retrieved from
4720 * the foreign server, get the index of the entry.
4721 */
4722 if (var->varno == rtindex &&
4724 {
4725 int attrno = var->varattno;
4726
4727 if (attrno < 0)
4728 {
4729 /*
4730 * We don't retrieve system columns other than ctid and oid.
4731 */
4732 if (attrno == SelfItemPointerAttributeNumber)
4733 dmstate->ctidAttno = i;
4734 else
4735 Assert(false);
4736 dmstate->hasSystemCols = true;
4737 }
4738 else
4739 {
4740 /*
4741 * We don't retrieve whole-row references to the target
4742 * relation either.
4743 */
4744 Assert(attrno > 0);
4745
4746 dmstate->attnoMap[attrno - 1] = i;
4747 }
4748 }
4749 i++;
4750 }
4751}
4752
4753/*
4754 * Extract and return an updated/deleted tuple from a scan tuple.
4755 */
4756static TupleTableSlot *
4758 ResultRelInfo *resultRelInfo,
4759 TupleTableSlot *slot,
4760 EState *estate)
4761{
4762 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4763 TupleTableSlot *resultSlot;
4764 Datum *values;
4765 bool *isnull;
4766 Datum *old_values;
4767 bool *old_isnull;
4768 int i;
4769
4770 /*
4771 * Use the return tuple slot as a place to store the result tuple.
4772 */
4773 resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4774
4775 /*
4776 * Extract all the values of the scan tuple.
4777 */
4778 slot_getallattrs(slot);
4779 old_values = slot->tts_values;
4780 old_isnull = slot->tts_isnull;
4781
4782 /*
4783 * Prepare to build the result tuple.
4784 */
4785 ExecClearTuple(resultSlot);
4786 values = resultSlot->tts_values;
4787 isnull = resultSlot->tts_isnull;
4788
4789 /*
4790 * Transpose data into proper fields of the result tuple.
4791 */
4792 for (i = 0; i < resultTupType->natts; i++)
4793 {
4794 int j = dmstate->attnoMap[i];
4795
4796 if (j == 0)
4797 {
4798 values[i] = (Datum) 0;
4799 isnull[i] = true;
4800 }
4801 else
4802 {
4803 values[i] = old_values[j - 1];
4804 isnull[i] = old_isnull[j - 1];
4805 }
4806 }
4807
4808 /*
4809 * Build the virtual tuple.
4810 */
4811 ExecStoreVirtualTuple(resultSlot);
4812
4813 /*
4814 * If we have any system columns to return, materialize a heap tuple in
4815 * the slot from column values set above and install system columns in
4816 * that tuple.
4817 */
4818 if (dmstate->hasSystemCols)
4819 {
4820 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4821
4822 /* ctid */
4823 if (dmstate->ctidAttno)
4824 {
4825 ItemPointer ctid = NULL;
4826
4827 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4828 resultTup->t_self = *ctid;
4829 }
4830
4831 /*
4832 * And remaining columns
4833 *
4834 * Note: since we currently don't allow the target relation to appear
4835 * on the nullable side of an outer join, any system columns wouldn't
4836 * go to NULL.
4837 *
4838 * Note: no need to care about tableoid here because it will be
4839 * initialized in ExecProcessReturning().
4840 */
4844 }
4845
4846 /*
4847 * And return the result tuple.
4848 */
4849 return resultSlot;
4850}
4851
4852/*
4853 * Prepare for processing of parameters used in remote query.
4854 */
4855static void
4857 List *fdw_exprs,
4858 int numParams,
4859 FmgrInfo **param_flinfo,
4860 List **param_exprs,
4861 const char ***param_values)
4862{
4863 int i;
4864 ListCell *lc;
4865
4866 Assert(numParams > 0);
4867
4868 /* Prepare for output conversion of parameters used in remote query. */
4869 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4870
4871 i = 0;
4872 foreach(lc, fdw_exprs)
4873 {
4874 Node *param_expr = (Node *) lfirst(lc);
4875 Oid typefnoid;
4876 bool isvarlena;
4877
4878 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4879 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4880 i++;
4881 }
4882
4883 /*
4884 * Prepare remote-parameter expressions for evaluation. (Note: in
4885 * practice, we expect that all these expressions will be just Params, so
4886 * we could possibly do something more efficient than using the full
4887 * expression-eval machinery for this. But probably there would be little
4888 * benefit, and it'd require postgres_fdw to know more than is desirable
4889 * about Param evaluation.)
4890 */
4891 *param_exprs = ExecInitExprList(fdw_exprs, node);
4892
4893 /* Allocate buffer for text form of query parameters. */
4894 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4895}
4896
4897/*
4898 * Construct array of query parameter values in text format.
4899 */
4900static void
4902 FmgrInfo *param_flinfo,
4903 List *param_exprs,
4904 const char **param_values)
4905{
4906 int nestlevel;
4907 int i;
4908 ListCell *lc;
4909
4910 nestlevel = set_transmission_modes();
4911
4912 i = 0;
4913 foreach(lc, param_exprs)
4914 {
4915 ExprState *expr_state = (ExprState *) lfirst(lc);
4916 Datum expr_value;
4917 bool isNull;
4918
4919 /* Evaluate the parameter expression */
4920 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4921
4922 /*
4923 * Get string representation of each parameter value by invoking
4924 * type-specific output function, unless the value is null.
4925 */
4926 if (isNull)
4927 param_values[i] = NULL;
4928 else
4929 param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4930
4931 i++;
4932 }
4933
4934 reset_transmission_modes(nestlevel);
4935}
4936
4937/*
4938 * postgresAnalyzeForeignTable
4939 * Test whether analyzing this foreign table is supported
4940 */
4941static bool
4944 BlockNumber *totalpages)
4945{
4946 ForeignTable *table;
4948 PGconn *conn;
4949 StringInfoData sql;
4950 PGresult *volatile res = NULL;
4951
4952 /* Return the row-analysis function pointer */
4954
4955 /*
4956 * Now we have to get the number of pages. It's annoying that the ANALYZE
4957 * API requires us to return that now, because it forces some duplication
4958 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4959 * it's probably not worth redefining that API at this point.
4960 */
4961
4962 /*
4963 * Get the connection to use. We do the remote access as the table's
4964 * owner, even if the ANALYZE was started by some other user.
4965 */
4966 table = GetForeignTable(RelationGetRelid(relation));
4967 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4968 conn = GetConnection(user, false, NULL);
4969
4970 /*
4971 * Construct command to get page count for relation.
4972 */
4973 initStringInfo(&sql);
4974 deparseAnalyzeSizeSql(&sql, relation);
4975
4976 /* In what follows, do not risk leaking any PGresults. */
4977 PG_TRY();
4978 {
4979 res = pgfdw_exec_query(conn, sql.data, NULL);
4981 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4982
4983 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4984 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4985 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4986 }
4987 PG_FINALLY();
4988 {
4989 PQclear(res);
4990 }
4991 PG_END_TRY();
4992
4994
4995 return true;
4996}
4997
4998/*
4999 * postgresGetAnalyzeInfoForForeignTable
5000 * Count tuples in foreign table (just get pg_class.reltuples).
5001 *
5002 * can_tablesample determines if the remote relation supports acquiring the
5003 * sample using TABLESAMPLE.
5004 */
5005static double
5006postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
5007{
5008 ForeignTable *table;
5010 PGconn *conn;
5011 StringInfoData sql;
5012 PGresult *volatile res = NULL;
5013 volatile double reltuples = -1;
5014 volatile char relkind = 0;
5015
5016 /* assume the remote relation does not support TABLESAMPLE */
5017 *can_tablesample = false;
5018
5019 /*
5020 * Get the connection to use. We do the remote access as the table's
5021 * owner, even if the ANALYZE was started by some other user.
5022 */
5023 table = GetForeignTable(RelationGetRelid(relation));
5024 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5025 conn = GetConnection(user, false, NULL);
5026
5027 /*
5028 * Construct command to get page count for relation.
5029 */
5030 initStringInfo(&sql);
5031 deparseAnalyzeInfoSql(&sql, relation);
5032
5033 /* In what follows, do not risk leaking any PGresults. */
5034 PG_TRY();
5035 {
5036 res = pgfdw_exec_query(conn, sql.data, NULL);
5038 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5039
5040 if (PQntuples(res) != 1 || PQnfields(res) != 2)
5041 elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
5042 reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
5043 relkind = *(PQgetvalue(res, 0, 1));
5044 }
5045 PG_FINALLY();
5046 {
5047 if (res)
5048 PQclear(res);
5049 }
5050 PG_END_TRY();
5051
5053
5054 /* TABLESAMPLE is supported only for regular tables and matviews */
5055 *can_tablesample = (relkind == RELKIND_RELATION ||
5056 relkind == RELKIND_MATVIEW ||
5057 relkind == RELKIND_PARTITIONED_TABLE);
5058
5059 return reltuples;
5060}
5061
5062/*
5063 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
5064 *
5065 * Selected rows are returned in the caller-allocated array rows[],
5066 * which must have at least targrows entries.
5067 * The actual number of rows selected is returned as the function result.
5068 * We also count the total number of rows in the table and return it into
5069 * *totalrows. Note that *totaldeadrows is always set to 0.
5070 *
5071 * Note that the returned list of rows is not always in order by physical
5072 * position in the table. Therefore, correlation estimates derived later
5073 * may be meaningless, but it's OK because we don't use the estimates
5074 * currently (the planner only pays attention to correlation for indexscans).
5075 */
5076static int
5078 HeapTuple *rows, int targrows,
5079 double *totalrows,
5080 double *totaldeadrows)
5081{
5082 PgFdwAnalyzeState astate;
5083 ForeignTable *table;
5084 ForeignServer *server;
5086 PGconn *conn;
5088 PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
5089 double sample_frac = -1.0;
5090 double reltuples;
5091 unsigned int cursor_number;
5092 StringInfoData sql;
5093 PGresult *volatile res = NULL;
5094 ListCell *lc;
5095
5096 /* Initialize workspace state */
5097 astate.rel = relation;
5099
5100 astate.rows = rows;
5101 astate.targrows = targrows;
5102 astate.numrows = 0;
5103 astate.samplerows = 0;
5104 astate.rowstoskip = -1; /* -1 means not set yet */
5105 reservoir_init_selection_state(&astate.rstate, targrows);
5106
5107 /* Remember ANALYZE context, and create a per-tuple temp context */
5110 "postgres_fdw temporary data",
5112
5113 /*
5114 * Get the connection to use. We do the remote access as the table's
5115 * owner, even if the ANALYZE was started by some other user.
5116 */
5117 table = GetForeignTable(RelationGetRelid(relation));
5118 server = GetForeignServer(table->serverid);
5119 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5120 conn = GetConnection(user, false, NULL);
5121
5122 /* We'll need server version, so fetch it now. */
5124
5125 /*
5126 * What sampling method should we use?
5127 */
5128 foreach(lc, server->options)
5129 {
5130 DefElem *def = (DefElem *) lfirst(lc);
5131
5132 if (strcmp(def->defname, "analyze_sampling") == 0)
5133 {
5134 char *value = defGetString(def);
5135
5136 if (strcmp(value, "off") == 0)
5137 method = ANALYZE_SAMPLE_OFF;
5138 else if (strcmp(value, "auto") == 0)
5139 method = ANALYZE_SAMPLE_AUTO;
5140 else if (strcmp(value, "random") == 0)
5141 method = ANALYZE_SAMPLE_RANDOM;
5142 else if (strc