57 #define DEFAULT_FDW_STARTUP_COST 100.0
60 #define DEFAULT_FDW_TUPLE_COST 0.01
63 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
350 Index resultRelation,
384 Index resultRelation,
429 List *param_join_conds,
432 double *p_rows,
int *p_width,
433 Cost *p_startup_cost,
Cost *p_total_cost);
442 double retrieved_rows,
445 Cost *p_startup_cost,
463 List *retrieved_attrs);
480 List *returningList);
485 List *fdw_scan_tlist,
496 const char ***param_values);
500 const char **param_values);
504 double *totaldeadrows);
514 List *retrieved_attrs,
634 baserel->fdw_private = (
void *) fpinfo;
809 List *useful_eclass_list =
NIL;
824 useful_eclass_list =
lappend(useful_eclass_list, cur_ec);
834 return useful_eclass_list;
851 if (restrictinfo->mergeopfamilies ==
NIL)
882 if (
bms_overlap(relids, restrictinfo->right_ec->ec_relids))
884 restrictinfo->right_ec);
885 else if (
bms_overlap(relids, restrictinfo->left_ec->ec_relids))
887 restrictinfo->left_ec);
890 return useful_eclass_list;
905 List *useful_pathkeys_list =
NIL;
906 List *useful_eclass_list;
918 bool query_pathkeys_ok =
true;
933 query_pathkeys_ok =
false;
938 if (query_pathkeys_ok)
955 return useful_pathkeys_list;
965 query_ec = query_pathkey->pk_eclass;
975 foreach(lc, useful_eclass_list)
981 if (cur_ec == query_ec)
986 OperatorFamilyRelationId, fpinfo))
998 useful_pathkeys_list =
lappend(useful_pathkeys_list,
1002 return useful_pathkeys_list;
1081 required_outer =
bms_union(rinfo->clause_relids,
1096 Assert(param_info != NULL);
1135 if (
arg.current == NULL)
1142 foreach(lc, clauses)
1157 required_outer =
bms_union(rinfo->clause_relids,
1166 Assert(param_info != NULL);
1180 foreach(lc, ppi_list)
1192 &startup_cost, &total_cost);
1234 List *fdw_recheck_quals =
NIL;
1235 List *retrieved_attrs;
1237 bool has_final_sort =
false;
1238 bool has_limit =
false;
1257 scan_relid = foreignrel->
relid;
1278 foreach(lc, scan_clauses)
1283 if (rinfo->pseudoconstant)
1300 fdw_recheck_quals = remote_exprs;
1358 foreach(lc, local_exprs)
1374 Join *join_plan = (
Join *) outer_plan;
1398 has_final_sort, has_limit,
false,
1399 &retrieved_attrs, ¶ms_list);
1412 fdw_private =
lappend(fdw_private,
1452 for (
int i = 0;
i < tupdesc->natts;
i++)
1460 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1477 att->atttypid = reltype;
1535 fsstate->cursor_exists =
false;
1547 "postgres_fdw tuple data",
1550 "postgres_fdw temporary data",
1564 fsstate->rel = NULL;
1574 fsstate->numParams = numParams;
1579 &fsstate->param_flinfo,
1580 &fsstate->param_exprs,
1581 &fsstate->param_values);
1669 snprintf(sql,
sizeof(sql),
"CLOSE c%u",
1674 snprintf(sql,
sizeof(sql),
"MOVE BACKWARD ALL IN c%u",
1711 if (fsstate == NULL)
1721 fsstate->
conn = NULL;
1761 Index resultRelation,
1769 List *withCheckOptionList =
NIL;
1772 bool doNothing =
false;
1773 int values_end_len = -1;
1806 if (!attr->attisdropped)
1823 elog(
ERROR,
"system-column update is not supported");
1850 elog(
ERROR,
"unexpected ON CONFLICT specification: %d",
1860 targetAttrs, doNothing,
1861 withCheckOptionList, returningList,
1862 &retrieved_attrs, &values_end_len);
1867 withCheckOptionList, returningList,
1876 elog(
ERROR,
"unexpected operation: %d", (
int) operation);
1909 List *retrieved_attrs;
1971 &slot, &planSlot, &numSlots);
1976 return rslot ? *rslot : NULL;
2000 slots, planSlots, numSlots);
2074 if (fmstate && fmstate->
p_nums > 0)
2094 &slot, &planSlot, &numSlots);
2096 return rslot ? rslot[0] : NULL;
2113 &slot, &planSlot, &numSlots);
2115 return rslot ? rslot[0] : NULL;
2129 if (fmstate == NULL)
2147 Index resultRelation;
2156 bool doNothing =
false;
2169 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2170 errmsg(
"cannot route tuples into foreign table to be updated \"%s\"",
2180 if (!attr->attisdropped)
2193 elog(
ERROR,
"unexpected ON CONFLICT specification: %d",
2194 (
int) onConflictAction);
2214 rte->
relkind = RELKIND_FOREIGN_TABLE;
2238 &retrieved_attrs, &values_end_len);
2249 retrieved_attrs !=
NIL,
2317 if (strcmp(def->
defname,
"updatable") == 0)
2324 if (strcmp(def->
defname,
"updatable") == 0)
2434 Index resultRelation,
2472 if (fscan->
scan.plan.qual !=
NIL)
2483 foreignrel = root->simple_rel_array[resultRelation];
2484 rte = root->simple_rte_array[resultRelation];
2501 &processed_tlist, &targetAttrs);
2502 forboth(lc, processed_tlist, lc2, targetAttrs)
2511 elog(
ERROR,
"system-column update is not supported");
2567 remote_exprs, ¶ms_list,
2568 returningList, &retrieved_attrs);
2573 remote_exprs, ¶ms_list,
2574 returningList, &retrieved_attrs);
2577 elog(
ERROR,
"unexpected operation: %d", (
int) operation);
2607 fscan->
scan.plan.lefttree = NULL;
2618 if (fscan->
scan.plan.async_capable)
2619 fscan->
scan.plan.async_capable =
false;
2678 dmstate->resultRel = dmstate->rel;
2686 dmstate->rel = NULL;
2690 dmstate->num_tuples = -1;
2704 "postgres_fdw temporary data",
2708 if (dmstate->has_returning)
2732 dmstate->numParams = numParams;
2737 &dmstate->param_flinfo,
2738 &dmstate->param_exprs,
2739 &dmstate->param_values);
2762 if (!resultRelInfo->ri_projectReturning)
2796 if (dmstate == NULL)
2804 dmstate->
conn = NULL;
2849 if (isdigit((
unsigned char) *ptr))
2851 int rti = strtol(ptr, &ptr, 10);
2866 if (isdigit((
unsigned char) *ptr))
2868 int rti = strtol(ptr, &ptr, 10);
2892 if (refname == NULL)
2894 if (strcmp(refname,
relname) != 0)
2976 bool server_truncatable =
true;
3002 foreach(cell, server->
options)
3006 if (strcmp(defel->
defname,
"truncatable") == 0)
3021 truncatable = server_truncatable;
3026 if (strcmp(defel->
defname,
"truncatable") == 0)
3035 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3036 errmsg(
"foreign table \"%s\" does not allow truncates",
3075 List *param_join_conds,
3078 double *p_rows,
int *p_width,
3079 Cost *p_startup_cost,
Cost *p_total_cost)
3083 double retrieved_rows;
3100 List *remote_param_join_conds;
3101 List *local_param_join_conds;
3110 List *retrieved_attrs;
3117 &remote_param_join_conds, &local_param_join_conds);
3123 fdw_scan_tlist =
NIL;
3130 remote_conds =
list_concat(remote_param_join_conds,
3141 remote_conds, pathkeys,
3144 false, &retrieved_attrs, NULL);
3149 &startup_cost, &total_cost);
3152 retrieved_rows = rows;
3156 local_param_join_conds,
3168 startup_cost += local_cost.
startup;
3169 total_cost += local_cost.
per_tuple * retrieved_rows;
3184 startup_cost -= tlist_cost.
startup;
3185 total_cost -= tlist_cost.
startup;
3186 total_cost -= tlist_cost.
per_tuple * rows;
3210 rows = fpinfo->
rows;
3212 width = fpinfo->
width;
3243 rows = foreignrel->
rows;
3253 nrows = fpinfo_i->
rows * fpinfo_o->
rows;
3260 retrieved_rows =
Min(retrieved_rows, nrows);
3283 startup_cost += join_cost.
startup;
3284 startup_cost += remote_conds_cost.
startup;
3304 run_cost += nrows * join_cost.
per_tuple;
3306 run_cost += nrows * remote_conds_cost.
per_tuple;
3320 double numGroups = 1;
3338 input_rows = ofpinfo->
rows;
3342 if (root->
parse->hasAggs)
3352 input_rows, NULL, NULL);
3373 rows = retrieved_rows = numGroups;
3412 startup_cost += remote_cost.
startup;
3413 run_cost += remote_cost.
per_tuple * numGroups;
3428 rows = foreignrel->
rows;
3436 retrieved_rows =
Min(retrieved_rows, foreignrel->
tuples);
3449 run_cost += cpu_per_tuple * foreignrel->
tuples;
3465 if (pathkeys !=
NIL)
3472 retrieved_rows, width,
3474 &startup_cost, &run_cost);
3483 total_cost = startup_cost + run_cost;
3490 retrieved_rows = rows;
3523 if (pathkeys ==
NIL && param_join_conds ==
NIL && fpextra == NULL)
3561 total_cost -= (total_cost - startup_cost) * 0.05 *
3568 *p_startup_cost = startup_cost;
3569 *p_total_cost = total_cost;
3578 double *rows,
int *width,
3579 Cost *startup_cost,
Cost *total_cost)
3603 p = strrchr(line,
'(');
3605 elog(
ERROR,
"could not interpret EXPLAIN output: \"%s\"", line);
3606 n = sscanf(p,
"(cost=%lf..%lf rows=%lf width=%d)",
3607 startup_cost, total_cost, rows, width);
3609 elog(
ERROR,
"could not interpret EXPLAIN output: \"%s\"", line);
3625 double retrieved_rows,
3627 double limit_tuples,
3628 Cost *p_startup_cost,
3647 *p_startup_cost + *p_run_cost,
3666 *p_startup_cost *= sort_multiplier;
3667 *p_run_cost *= sort_multiplier;
3688 if (
state->current != NULL)
3698 state->current = expr;
3752 NULL,
values, NULL, NULL, 0))
3824 snprintf(sql,
sizeof(sql),
"FETCH %d FROM c%u",
3839 for (
i = 0;
i < numrows;
i++)
3963 List *retrieved_attrs)
3992 fmstate->
query = query;
4005 "postgres_fdw temporary data",
4025 elog(
ERROR,
"could not find junk ctid column");
4041 Assert(!attr->attisdropped);
4044 if (attr->attgenerated)
4083 const char **p_values;
4114 fmstate->
query = sql.data;
4147 fmstate->
p_nums * (*numSlots),
4186 return (n_rows > 0) ? slots : NULL;
4206 snprintf(prep_name,
sizeof(prep_name),
"pgsql_fdw_prep_%u",
4236 fmstate->
p_name = p_name;
4248 static const char **
4254 const char **p_values;
4262 p_values = (
const char **)
palloc(
sizeof(
char *) * fmstate->
p_nums * numSlots);
4265 Assert(!(tupleid != NULL && numSlots > 1));
4268 if (tupleid != NULL)
4286 for (
i = 0;
i < numSlots;
i++)
4288 j = (tupleid != NULL) ? 1 : 0;
4297 if (attr->attgenerated)
4301 p_values[pindex] = NULL;
4370 fmstate->
conn = NULL;
4410 bool have_wholerow =
false;
4428 var->
varno == rtindex &&
4431 have_wholerow =
true;
4441 for (
i = 1;
i <= tupdesc->
natts;
i++)
4447 if (attr->attisdropped)
4476 var->
varno == rtindex &&
4511 List *new_tlist = tlist;
4515 foreach(lc, old_tlist)
4522 new_tlist =
lappend(new_tlist,
4563 NULL,
values, NULL, NULL, 0))
4597 Assert(resultRelInfo->ri_projectReturning);
4605 estate->es_processed += 1;
4652 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4663 List *fdw_scan_tlist,
4689 foreach(lc, fdw_scan_tlist)
4700 if (var->
varno == rtindex &&
4770 for (
i = 0;
i < resultTupType->
natts;
i++)
4782 isnull[
i] = old_isnull[
j - 1];
4806 resultTup->
t_self = *ctid;
4839 const char ***param_values)
4850 foreach(lc, fdw_exprs)
4872 *param_values = (
const char **)
palloc0(numParams *
sizeof(
char *));
4882 const char **param_values)
4891 foreach(lc, param_exprs)
4898 expr_value =
ExecEvalExpr(expr_state, econtext, &isNull);
4905 param_values[
i] = NULL;
4962 elog(
ERROR,
"unexpected result from deparseAnalyzeSizeSql query");
4991 volatile double reltuples = -1;
4992 volatile char relkind = 0;
4995 *can_tablesample =
false;
5019 elog(
ERROR,
"unexpected result from deparseAnalyzeTuplesSql query");
5033 *can_tablesample = (relkind == RELKIND_RELATION ||
5034 relkind == RELKIND_MATVIEW ||
5035 relkind == RELKIND_PARTITIONED_TABLE);
5058 double *totaldeadrows)
5067 double sample_frac = -1.0;
5075 astate.
rel = relation;
5088 "postgres_fdw temporary data",
5110 if (strcmp(def->
defname,
"analyze_sampling") == 0)
5114 if (strcmp(
value,
"off") == 0)
5116 else if (strcmp(
value,
"auto") == 0)
5118 else if (strcmp(
value,
"random") == 0)
5120 else if (strcmp(
value,
"system") == 0)
5122 else if (strcmp(
value,
"bernoulli") == 0)
5133 if (strcmp(def->
defname,
"analyze_sampling") == 0)
5137 if (strcmp(
value,
"off") == 0)
5139 else if (strcmp(
value,
"auto") == 0)
5141 else if (strcmp(
value,
"random") == 0)
5143 else if (strcmp(
value,
"system") == 0)
5145 else if (strcmp(
value,
"bernoulli") == 0)
5160 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5161 errmsg(
"remote server does not support TABLESAMPLE feature")));
5170 bool can_tablesample;
5187 if ((reltuples <= 0) || (targrows >= reltuples))
5215 sample_frac = targrows / reltuples;
5222 Assert(sample_frac >= 0.0 && sample_frac <= 1.0);
5269 if (strcmp(def->
defname,
"fetch_size") == 0)
5279 if (strcmp(def->
defname,
"fetch_size") == 0)
5287 snprintf(fetch_sql,
sizeof(fetch_sql),
"FETCH %d FROM c%u",
5313 for (
i = 0;
i < numrows;
i++)
5337 *totaldeadrows = 0.0;
5347 *totalrows = reltuples;
5353 (
errmsg(
"\"%s\": table contains %.0f rows, %d rows in sample",
5355 *totalrows, astate.
numrows)));
5379 if (astate->
numrows < targrows)
5398 Assert(pos >= 0 && pos < targrows);
5436 bool import_collate =
true;
5437 bool import_default =
false;
5438 bool import_generated =
true;
5439 bool import_not_null =
true;
5450 foreach(lc,
stmt->options)
5454 if (strcmp(def->
defname,
"import_collate") == 0)
5456 else if (strcmp(def->
defname,
"import_default") == 0)
5458 else if (strcmp(def->
defname,
"import_generated") == 0)
5460 else if (strcmp(def->
defname,
"import_not_null") == 0)
5464 (
errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5478 import_collate =
false;
5496 (
errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5497 errmsg(
"schema \"%s\" is not present on foreign server \"%s\"",
5526 " format_type(atttypid, atttypmod), "
5528 " pg_get_expr(adbin, adrelid), ");