56 #define DEFAULT_FDW_STARTUP_COST 100.0
59 #define DEFAULT_FDW_TUPLE_COST 0.01
62 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
349 Index resultRelation,
383 Index resultRelation,
428 List *param_join_conds,
431 double *p_rows,
int *p_width,
432 Cost *p_startup_cost,
Cost *p_total_cost);
441 double retrieved_rows,
444 Cost *p_startup_cost,
462 List *retrieved_attrs);
479 List *returningList);
484 List *fdw_scan_tlist,
495 const char ***param_values);
499 const char **param_values);
503 double *totaldeadrows);
513 List *retrieved_attrs,
808 List *useful_eclass_list =
NIL;
823 useful_eclass_list =
lappend(useful_eclass_list, cur_ec);
833 return useful_eclass_list;
889 return useful_eclass_list;
904 List *useful_pathkeys_list =
NIL;
905 List *useful_eclass_list;
917 bool query_pathkeys_ok =
true;
932 query_pathkeys_ok =
false;
937 if (query_pathkeys_ok)
954 return useful_pathkeys_list;
974 foreach(lc, useful_eclass_list)
980 if (cur_ec == query_ec)
985 OperatorFamilyRelationId, fpinfo))
997 useful_pathkeys_list =
lappend(useful_pathkeys_list,
1001 return useful_pathkeys_list;
1095 Assert(param_info != NULL);
1134 if (
arg.current == NULL)
1141 foreach(lc, clauses)
1165 Assert(param_info != NULL);
1179 foreach(lc, ppi_list)
1191 &startup_cost, &total_cost);
1233 List *fdw_recheck_quals =
NIL;
1234 List *retrieved_attrs;
1236 bool has_final_sort =
false;
1237 bool has_limit =
false;
1256 scan_relid = foreignrel->
relid;
1277 foreach(lc, scan_clauses)
1299 fdw_recheck_quals = remote_exprs;
1359 foreach(lc, local_exprs)
1375 Join *join_plan = (
Join *) outer_plan;
1399 has_final_sort, has_limit,
false,
1400 &retrieved_attrs, ¶ms_list);
1413 fdw_private =
lappend(fdw_private,
1453 for (
int i = 0;
i < tupdesc->natts;
i++)
1461 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1478 att->atttypid = reltype;
1524 userid = rte->checkAsUser ? rte->checkAsUser :
GetUserId();
1538 fsstate->cursor_exists =
false;
1550 "postgres_fdw tuple data",
1553 "postgres_fdw temporary data",
1567 fsstate->rel = NULL;
1577 fsstate->numParams = numParams;
1582 &fsstate->param_flinfo,
1583 &fsstate->param_exprs,
1584 &fsstate->param_values);
1672 snprintf(sql,
sizeof(sql),
"CLOSE c%u",
1677 snprintf(sql,
sizeof(sql),
"MOVE BACKWARD ALL IN c%u",
1714 if (fsstate == NULL)
1724 fsstate->
conn = NULL;
1764 Index resultRelation,
1772 List *withCheckOptionList =
NIL;
1775 bool doNothing =
false;
1776 int values_end_len = -1;
1809 if (!attr->attisdropped)
1825 elog(
ERROR,
"system-column update is not supported");
1852 elog(
ERROR,
"unexpected ON CONFLICT specification: %d",
1862 targetAttrs, doNothing,
1863 withCheckOptionList, returningList,
1864 &retrieved_attrs, &values_end_len);
1869 withCheckOptionList, returningList,
1878 elog(
ERROR,
"unexpected operation: %d", (
int) operation);
1911 List *retrieved_attrs;
1973 &slot, &planSlot, &numSlots);
1978 return rslot ? *rslot : NULL;
2002 slots, planSlots, numSlots);
2065 if (fmstate && fmstate->
p_nums > 0)
2085 &slot, &planSlot, &numSlots);
2087 return rslot ? rslot[0] : NULL;
2104 &slot, &planSlot, &numSlots);
2106 return rslot ? rslot[0] : NULL;
2120 if (fmstate == NULL)
2138 Index resultRelation;
2147 bool doNothing =
false;
2160 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2161 errmsg(
"cannot route tuples into foreign table to be updated \"%s\"",
2171 if (!attr->attisdropped)
2184 elog(
ERROR,
"unexpected ON CONFLICT specification: %d",
2185 (
int) onConflictAction);
2205 rte->
relkind = RELKIND_FOREIGN_TABLE;
2229 &retrieved_attrs, &values_end_len);
2240 retrieved_attrs !=
NIL,
2308 if (strcmp(def->
defname,
"updatable") == 0)
2315 if (strcmp(def->
defname,
"updatable") == 0)
2425 Index resultRelation,
2492 &processed_tlist, &targetAttrs);
2493 forboth(lc, processed_tlist, lc2, targetAttrs)
2502 elog(
ERROR,
"system-column update is not supported");
2558 remote_exprs, ¶ms_list,
2559 returningList, &retrieved_attrs);
2564 remote_exprs, ¶ms_list,
2565 returningList, &retrieved_attrs);
2568 elog(
ERROR,
"unexpected operation: %d", (
int) operation);
2651 userid = rte->checkAsUser ? rte->checkAsUser :
GetUserId();
2671 dmstate->resultRel = dmstate->rel;
2679 dmstate->rel = NULL;
2683 dmstate->num_tuples = -1;
2697 "postgres_fdw temporary data",
2701 if (dmstate->has_returning)
2725 dmstate->numParams = numParams;
2730 &dmstate->param_flinfo,
2731 &dmstate->param_exprs,
2732 &dmstate->param_values);
2755 if (!resultRelInfo->ri_projectReturning)
2789 if (dmstate == NULL)
2797 dmstate->
conn = NULL;
2842 if (isdigit((
unsigned char) *ptr))
2844 int rti = strtol(ptr, &ptr, 10);
2859 if (isdigit((
unsigned char) *ptr))
2861 int rti = strtol(ptr, &ptr, 10);
2885 if (refname == NULL)
2887 if (strcmp(refname,
relname) != 0)
2969 bool server_truncatable =
true;
2995 foreach(cell, server->
options)
2999 if (strcmp(defel->
defname,
"truncatable") == 0)
3014 truncatable = server_truncatable;
3019 if (strcmp(defel->
defname,
"truncatable") == 0)
3028 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3029 errmsg(
"foreign table \"%s\" does not allow truncates",
3068 List *param_join_conds,
3071 double *p_rows,
int *p_width,
3072 Cost *p_startup_cost,
Cost *p_total_cost)
3076 double retrieved_rows;
3093 List *remote_param_join_conds;
3094 List *local_param_join_conds;
3103 List *retrieved_attrs;
3110 &remote_param_join_conds, &local_param_join_conds);
3116 fdw_scan_tlist =
NIL;
3123 remote_conds =
list_concat(remote_param_join_conds,
3134 remote_conds, pathkeys,
3137 false, &retrieved_attrs, NULL);
3142 &startup_cost, &total_cost);
3145 retrieved_rows = rows;
3149 local_param_join_conds,
3161 startup_cost += local_cost.
startup;
3162 total_cost += local_cost.
per_tuple * retrieved_rows;
3177 startup_cost -= tlist_cost.
startup;
3178 total_cost -= tlist_cost.
startup;
3179 total_cost -= tlist_cost.
per_tuple * rows;
3203 rows = fpinfo->
rows;
3205 width = fpinfo->
width;
3236 rows = foreignrel->
rows;
3246 nrows = fpinfo_i->
rows * fpinfo_o->
rows;
3253 retrieved_rows =
Min(retrieved_rows, nrows);
3276 startup_cost += join_cost.
startup;
3277 startup_cost += remote_conds_cost.
startup;
3297 run_cost += nrows * join_cost.
per_tuple;
3299 run_cost += nrows * remote_conds_cost.
per_tuple;
3313 double numGroups = 1;
3331 input_rows = ofpinfo->
rows;
3345 input_rows, NULL, NULL);
3366 rows = retrieved_rows = numGroups;
3405 startup_cost += remote_cost.
startup;
3406 run_cost += remote_cost.
per_tuple * numGroups;
3421 rows = foreignrel->
rows;
3429 retrieved_rows =
Min(retrieved_rows, foreignrel->
tuples);
3442 run_cost += cpu_per_tuple * foreignrel->
tuples;
3458 if (pathkeys !=
NIL)
3465 retrieved_rows, width,
3467 &startup_cost, &run_cost);
3476 total_cost = startup_cost + run_cost;
3483 retrieved_rows = rows;
3516 if (pathkeys ==
NIL && param_join_conds ==
NIL && fpextra == NULL)
3554 total_cost -= (total_cost - startup_cost) * 0.05 *
3561 *p_startup_cost = startup_cost;
3562 *p_total_cost = total_cost;
3571 double *rows,
int *width,
3572 Cost *startup_cost,
Cost *total_cost)
3596 p = strrchr(line,
'(');
3598 elog(
ERROR,
"could not interpret EXPLAIN output: \"%s\"", line);
3599 n = sscanf(p,
"(cost=%lf..%lf rows=%lf width=%d)",
3600 startup_cost, total_cost, rows, width);
3602 elog(
ERROR,
"could not interpret EXPLAIN output: \"%s\"", line);
3618 double retrieved_rows,
3620 double limit_tuples,
3621 Cost *p_startup_cost,
3640 *p_startup_cost + *p_run_cost,
3659 *p_startup_cost *= sort_multiplier;
3660 *p_run_cost *= sort_multiplier;
3681 if (
state->current != NULL)
3691 state->current = expr;
3745 NULL,
values, NULL, NULL, 0))
3817 snprintf(sql,
sizeof(sql),
"FETCH %d FROM c%u",
3832 for (
i = 0;
i < numrows;
i++)
3948 List *retrieved_attrs)
3980 fmstate->
query = query;
3993 "postgres_fdw temporary data",
4013 elog(
ERROR,
"could not find junk ctid column");
4029 Assert(!attr->attisdropped);
4032 if (attr->attgenerated)
4071 const char **p_values;
4102 fmstate->
query = sql.data;
4135 fmstate->
p_nums * (*numSlots),
4174 return (n_rows > 0) ? slots : NULL;
4194 snprintf(prep_name,
sizeof(prep_name),
"pgsql_fdw_prep_%u",
4224 fmstate->
p_name = p_name;
4236 static const char **
4242 const char **p_values;
4250 p_values = (
const char **)
palloc(
sizeof(
char *) * fmstate->
p_nums * numSlots);
4253 Assert(!(tupleid != NULL && numSlots > 1));
4256 if (tupleid != NULL)
4274 for (
i = 0;
i < numSlots;
i++)
4276 j = (tupleid != NULL) ? 1 : 0;
4285 if (attr->attgenerated)
4289 p_values[pindex] = NULL;
4358 fmstate->
conn = NULL;
4398 bool have_wholerow =
false;
4416 var->
varno == rtindex &&
4419 have_wholerow =
true;
4429 for (
i = 1;
i <= tupdesc->
natts;
i++)
4435 if (attr->attisdropped)
4464 var->
varno == rtindex &&
4499 List *new_tlist = tlist;
4503 foreach(lc, old_tlist)
4510 new_tlist =
lappend(new_tlist,
4551 NULL,
values, NULL, NULL, 0))
4585 Assert(resultRelInfo->ri_projectReturning);
4593 estate->es_processed += 1;
4640 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4651 List *fdw_scan_tlist,
4677 foreach(lc, fdw_scan_tlist)
4688 if (var->
varno == rtindex &&
4758 for (
i = 0;
i < resultTupType->
natts;
i++)
4770 isnull[
i] = old_isnull[
j - 1];
4794 resultTup->
t_self = *ctid;
4827 const char ***param_values)
4838 foreach(lc, fdw_exprs)
4860 *param_values = (
const char **)
palloc0(numParams *
sizeof(
char *));
4870 const char **param_values)
4879 foreach(lc, param_exprs)
4886 expr_value =
ExecEvalExpr(expr_state, econtext, &isNull);
4893 param_values[
i] = NULL;
4950 elog(
ERROR,
"unexpected result from deparseAnalyzeSizeSql query");
4984 double *totaldeadrows)
4996 astate.
rel = relation;
5009 "postgres_fdw temporary data",
5051 if (strcmp(def->
defname,
"fetch_size") == 0)
5061 if (strcmp(def->
defname,
"fetch_size") == 0)
5069 snprintf(fetch_sql,
sizeof(fetch_sql),
"FETCH %d FROM c%u",
5095 for (
i = 0;
i < numrows;
i++)
5119 *totaldeadrows = 0.0;
5128 (
errmsg(
"\"%s\": table contains %.0f rows, %d rows in sample",
5154 if (astate->
numrows < targrows)
5173 Assert(pos >= 0 && pos < targrows);
5211 bool import_collate =
true;
5212 bool import_default =
false;
5213 bool import_generated =
true;
5214 bool import_not_null =
true;
5229 if (strcmp(def->
defname,
"import_collate") == 0)
5231 else if (strcmp(def->
defname,
"import_default") == 0)
5233 else if (strcmp(def->
defname,
"import_generated") == 0)
5235 else if (strcmp(def->
defname,
"import_not_null") == 0)
5239 (
errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5253 import_collate =
false;
5271 (
errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5272 errmsg(
"schema \"%s\" is not present on foreign server \"%s\"",
5301 " format_type(atttypid, atttypmod), "
5303 " pg_get_expr(adbin, adrelid), ");
5316 " collnsp.nspname ");
5323 " JOIN pg_namespace n ON "
5324 " relnamespace = n.oid "
5325 " LEFT JOIN pg_attribute a ON "
5326 " attrelid = c.oid AND attnum > 0 "
5327 " AND NOT attisdropped "
5328 " LEFT JOIN pg_attrdef ad ON "
5329 " adrelid = c.oid AND adnum = attnum ");
5333 " LEFT JOIN pg_collation coll ON "
5334 " coll.oid = attcollation "
5335 " LEFT JOIN pg_namespace collnsp ON "
5336 " collnsp.oid = collnamespace ");
5339 "WHERE c.relkind IN ("
5345 " AND n.nspname = ");
5357 bool first_item =
true;
5389 for (
i = 0;
i < numrows;)
5392 bool first_item =
true;
5407 char *collnamespace;
5445 if (import_collate && collname != NULL && collnamespace != NULL)
5451 if (import_default && attdefault != NULL &&
5452 (!attgenerated || !attgenerated[0]))
5456 if (import_generated && attgenerated != NULL &&
5457 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
5459 Assert(attdefault != NULL);
5461 " GENERATED ALWAYS AS (%s) STORED",
5469 while (++
i < numrows &&