PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
postgres_fdw.c File Reference
#include "postgres.h"
#include "postgres_fdw.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/cost.h"
#include "optimizer/clauses.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/sampling.h"
#include "utils/selfuncs.h"
Include dependency graph for postgres_fdw.c:

Go to the source code of this file.

Data Structures

struct  PgFdwScanState
 
struct  PgFdwModifyState
 
struct  PgFdwDirectModifyState
 
struct  PgFdwAnalyzeState
 
struct  ConversionLocation
 
struct  ec_member_foreign_arg
 

Macros

#define DEFAULT_FDW_STARTUP_COST   100.0
 
#define DEFAULT_FDW_TUPLE_COST   0.01
 
#define DEFAULT_FDW_SORT_MULTIPLIER   1.2
 

Typedefs

typedef struct PgFdwScanState PgFdwScanState
 
typedef struct PgFdwModifyState PgFdwModifyState
 
typedef struct
PgFdwDirectModifyState 
PgFdwDirectModifyState
 
typedef struct PgFdwAnalyzeState PgFdwAnalyzeState
 
typedef struct ConversionLocation ConversionLocation
 

Enumerations

enum  FdwScanPrivateIndex {
  FdwScanPrivateSelectSql, FdwScanPrivateRemoteConds, FdwScanPrivateRetrievedAttrs, FdwScanPrivateFetchSize,
  FdwScanPrivateRelations
}
 
enum  FdwModifyPrivateIndex { FdwModifyPrivateUpdateSql, FdwModifyPrivateTargetAttnums, FdwModifyPrivateHasReturning, FdwModifyPrivateRetrievedAttrs }
 
enum  FdwDirectModifyPrivateIndex { FdwDirectModifyPrivateUpdateSql, FdwDirectModifyPrivateHasReturning, FdwDirectModifyPrivateRetrievedAttrs, FdwDirectModifyPrivateSetProcessed }
 

Functions

 PG_FUNCTION_INFO_V1 (postgres_fdw_handler)
 
static void postgresGetForeignRelSize (PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
 
static void postgresGetForeignPaths (PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
 
static ForeignScanpostgresGetForeignPlan (PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
 
static void postgresBeginForeignScan (ForeignScanState *node, int eflags)
 
static TupleTableSlotpostgresIterateForeignScan (ForeignScanState *node)
 
static void postgresReScanForeignScan (ForeignScanState *node)
 
static void postgresEndForeignScan (ForeignScanState *node)
 
static void postgresAddForeignUpdateTargets (Query *parsetree, RangeTblEntry *target_rte, Relation target_relation)
 
static ListpostgresPlanForeignModify (PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
 
static void postgresBeginForeignModify (ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags)
 
static TupleTableSlotpostgresExecForeignInsert (EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
 
static TupleTableSlotpostgresExecForeignUpdate (EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
 
static TupleTableSlotpostgresExecForeignDelete (EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
 
static void postgresEndForeignModify (EState *estate, ResultRelInfo *resultRelInfo)
 
static int postgresIsForeignRelUpdatable (Relation rel)
 
static bool postgresPlanDirectModify (PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
 
static void postgresBeginDirectModify (ForeignScanState *node, int eflags)
 
static TupleTableSlotpostgresIterateDirectModify (ForeignScanState *node)
 
static void postgresEndDirectModify (ForeignScanState *node)
 
static void postgresExplainForeignScan (ForeignScanState *node, ExplainState *es)
 
static void postgresExplainForeignModify (ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
 
static void postgresExplainDirectModify (ForeignScanState *node, ExplainState *es)
 
static bool postgresAnalyzeForeignTable (Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
 
static ListpostgresImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid)
 
static void postgresGetForeignJoinPaths (PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra)
 
static bool postgresRecheckForeignScan (ForeignScanState *node, TupleTableSlot *slot)
 
static void postgresGetForeignUpperPaths (PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel)
 
static void estimate_path_cost_size (PlannerInfo *root, RelOptInfo *baserel, List *join_conds, List *pathkeys, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost)
 
static void get_remote_estimate (const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost)
 
static bool ec_member_matches_foreign (PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
 
static void create_cursor (ForeignScanState *node)
 
static void fetch_more_data (ForeignScanState *node)
 
static void close_cursor (PGconn *conn, unsigned int cursor_number)
 
static void prepare_foreign_modify (PgFdwModifyState *fmstate)
 
static const char ** convert_prep_stmt_params (PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
 
static void store_returning_result (PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
 
static void execute_dml_stmt (ForeignScanState *node)
 
static TupleTableSlotget_returning_data (ForeignScanState *node)
 
static void prepare_query_params (PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
 
static void process_query_params (ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
 
static int postgresAcquireSampleRowsFunc (Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
 
static void analyze_row_processor (PGresult *res, int row, PgFdwAnalyzeState *astate)
 
static HeapTuple make_tuple_from_result_row (PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
 
static void conversion_error_callback (void *arg)
 
static bool foreign_join_ok (PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
 
static bool foreign_grouping_ok (PlannerInfo *root, RelOptInfo *grouped_rel)
 
static Listget_useful_pathkeys_for_relation (PlannerInfo *root, RelOptInfo *rel)
 
static Listget_useful_ecs_for_relation (PlannerInfo *root, RelOptInfo *rel)
 
static void add_paths_with_pathkeys_for_rel (PlannerInfo *root, RelOptInfo *rel, Path *epq_path)
 
static void add_foreign_grouping_paths (PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel)
 
Datum postgres_fdw_handler (PG_FUNCTION_ARGS)
 
int set_transmission_modes (void)
 
void reset_transmission_modes (int nestlevel)
 
Exprfind_em_expr_for_rel (EquivalenceClass *ec, RelOptInfo *rel)
 

Variables

 PG_MODULE_MAGIC
 

Macro Definition Documentation

#define DEFAULT_FDW_SORT_MULTIPLIER   1.2

Definition at line 53 of file postgres_fdw.c.

Referenced by estimate_path_cost_size().

#define DEFAULT_FDW_STARTUP_COST   100.0

Definition at line 47 of file postgres_fdw.c.

Referenced by postgresGetForeignRelSize().

#define DEFAULT_FDW_TUPLE_COST   0.01

Definition at line 50 of file postgres_fdw.c.

Referenced by postgresGetForeignRelSize().

Typedef Documentation

Enumeration Type Documentation

Enumerator
FdwDirectModifyPrivateUpdateSql 
FdwDirectModifyPrivateHasReturning 
FdwDirectModifyPrivateRetrievedAttrs 
FdwDirectModifyPrivateSetProcessed 

Definition at line 111 of file postgres_fdw.c.

112 {
113  /* SQL statement to execute remotely (as a String node) */
115  /* has-returning flag (as an integer Value node) */
117  /* Integer list of attribute numbers retrieved by RETURNING */
119  /* set-processed flag (as an integer Value node) */
121 };
Enumerator
FdwModifyPrivateUpdateSql 
FdwModifyPrivateTargetAttnums 
FdwModifyPrivateHasReturning 
FdwModifyPrivateRetrievedAttrs 

Definition at line 90 of file postgres_fdw.c.

91 {
92  /* SQL statement to execute remotely (as a String node) */
94  /* Integer list of target attribute numbers for INSERT/UPDATE */
96  /* has-returning flag (as an integer Value node) */
98  /* Integer list of attribute numbers retrieved by RETURNING */
100 };
Enumerator
FdwScanPrivateSelectSql 
FdwScanPrivateRemoteConds 
FdwScanPrivateRetrievedAttrs 
FdwScanPrivateFetchSize 
FdwScanPrivateRelations 

Definition at line 62 of file postgres_fdw.c.

63 {
64  /* SQL statement to execute remotely (as a String node) */
66  /* List of restriction clauses that can be executed remotely */
68  /* Integer list of attribute numbers retrieved by the SELECT */
70  /* Integer representing the desired fetch_size */
72 
73  /*
74  * String describing join i.e. names of relations being joined and types
75  * of join, added when the scan is join
76  */
78 };

Function Documentation

static void add_foreign_grouping_paths ( PlannerInfo root,
RelOptInfo input_rel,
RelOptInfo grouped_rel 
)
static

Definition at line 4693 of file postgres_fdw.c.

References add_path(), create_foreignscan_path(), estimate_path_cost_size(), RelOptInfo::fdw_private, foreign_grouping_ok(), Query::groupClause, Query::groupingSets, Query::hasAggs, PlannerInfo::hasHavingQual, NIL, NULL, PgFdwRelationInfo::outerrel, parse(), PlannerInfo::parse, PgFdwRelationInfo::rows, PgFdwRelationInfo::server, PgFdwRelationInfo::shippable_extensions, PgFdwRelationInfo::startup_cost, PgFdwRelationInfo::table, PgFdwRelationInfo::total_cost, PlannerInfo::upper_targets, UPPERREL_GROUP_AGG, PgFdwRelationInfo::user, and PgFdwRelationInfo::width.

Referenced by postgresGetForeignUpperPaths().

4695 {
4696  Query *parse = root->parse;
4697  PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
4698  PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
4699  ForeignPath *grouppath;
4700  PathTarget *grouping_target;
4701  double rows;
4702  int width;
4703  Cost startup_cost;
4704  Cost total_cost;
4705 
4706  /* Nothing to be done, if there is no grouping or aggregation required. */
4707  if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
4708  !root->hasHavingQual)
4709  return;
4710 
4711  grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
4712 
4713  /* save the input_rel as outerrel in fpinfo */
4714  fpinfo->outerrel = input_rel;
4715 
4716  /*
4717  * Copy foreign table, foreign server, user mapping, shippable extensions
4718  * etc. details from the input relation's fpinfo.
4719  */
4720  fpinfo->table = ifpinfo->table;
4721  fpinfo->server = ifpinfo->server;
4722  fpinfo->user = ifpinfo->user;
4723  fpinfo->shippable_extensions = ifpinfo->shippable_extensions;
4724 
4725  /* Assess if it is safe to push down aggregation and grouping. */
4726  if (!foreign_grouping_ok(root, grouped_rel))
4727  return;
4728 
4729  /* Estimate the cost of push down */
4730  estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
4731  &width, &startup_cost, &total_cost);
4732 
4733  /* Now update this information in the fpinfo */
4734  fpinfo->rows = rows;
4735  fpinfo->width = width;
4736  fpinfo->startup_cost = startup_cost;
4737  fpinfo->total_cost = total_cost;
4738 
4739  /* Create and add foreign path to the grouping relation. */
4740  grouppath = create_foreignscan_path(root,
4741  grouped_rel,
4742  grouping_target,
4743  rows,
4744  startup_cost,
4745  total_cost,
4746  NIL, /* no pathkeys */
4747  NULL, /* no required_outer */
4748  NULL,
4749  NIL); /* no fdw_private */
4750 
4751  /* Add generated path into grouped_rel by add_path(). */
4752  add_path(grouped_rel, (Path *) grouppath);
4753 }
#define NIL
Definition: pg_list.h:69
Query * parse
Definition: relation.h:152
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:412
bool hasAggs
Definition: parsenodes.h:116
ForeignServer * server
Definition: postgres_fdw.h:78
List * groupingSets
Definition: parsenodes.h:139
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
ForeignPath * create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows, Cost startup_cost, Cost total_cost, List *pathkeys, Relids required_outer, Path *fdw_outerpath, List *fdw_private)
Definition: pathnode.c:1844
UserMapping * user
Definition: postgres_fdw.h:79
static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel)
void * fdw_private
Definition: relation.h:541
#define NULL
Definition: c.h:226
static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *baserel, List *join_conds, List *pathkeys, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost)
ForeignTable * table
Definition: postgres_fdw.h:77
List * groupClause
Definition: parsenodes.h:137
bool hasHavingQual
Definition: relation.h:297
List * shippable_extensions
Definition: postgres_fdw.h:74
Definition: relation.h:888
double Cost
Definition: nodes.h:632
static struct subre * parse(struct vars *, int, int, struct state *, struct state *)
Definition: regcomp.c:651
struct PathTarget * upper_targets[UPPERREL_FINAL+1]
Definition: relation.h:270
static void add_paths_with_pathkeys_for_rel ( PlannerInfo root,
RelOptInfo rel,
Path epq_path 
)
static

Definition at line 4278 of file postgres_fdw.c.

References add_path(), create_foreignscan_path(), estimate_path_cost_size(), get_useful_pathkeys_for_relation(), lfirst, NIL, and NULL.

Referenced by postgresGetForeignJoinPaths(), and postgresGetForeignPaths().

4280 {
4281  List *useful_pathkeys_list = NIL; /* List of all pathkeys */
4282  ListCell *lc;
4283 
4284  useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
4285 
4286  /* Create one path for each set of pathkeys we found above. */
4287  foreach(lc, useful_pathkeys_list)
4288  {
4289  double rows;
4290  int width;
4291  Cost startup_cost;
4292  Cost total_cost;
4293  List *useful_pathkeys = lfirst(lc);
4294 
4295  estimate_path_cost_size(root, rel, NIL, useful_pathkeys,
4296  &rows, &width, &startup_cost, &total_cost);
4297 
4298  add_path(rel, (Path *)
4299  create_foreignscan_path(root, rel,
4300  NULL,
4301  rows,
4302  startup_cost,
4303  total_cost,
4304  useful_pathkeys,
4305  NULL,
4306  epq_path,
4307  NIL));
4308  }
4309 }
#define NIL
Definition: pg_list.h:69
static List * get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:787
void add_path(RelOptInfo *parent_rel, Path *new_path)
Definition: pathnode.c:412
ForeignPath * create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows, Cost startup_cost, Cost total_cost, List *pathkeys, Relids required_outer, Path *fdw_outerpath, List *fdw_private)
Definition: pathnode.c:1844
#define NULL
Definition: c.h:226
static void estimate_path_cost_size(PlannerInfo *root, RelOptInfo *baserel, List *join_conds, List *pathkeys, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost)
#define lfirst(lc)
Definition: pg_list.h:106
Definition: pg_list.h:45
Definition: relation.h:888
double Cost
Definition: nodes.h:632
static void analyze_row_processor ( PGresult res,
int  row,
PgFdwAnalyzeState astate 
)
static

Definition at line 3701 of file postgres_fdw.c.

References PgFdwAnalyzeState::anl_cxt, Assert, PgFdwAnalyzeState::attinmeta, heap_freetuple(), make_tuple_from_result_row(), MemoryContextSwitchTo(), NULL, PgFdwAnalyzeState::numrows, ReservoirStateData::randstate, PgFdwAnalyzeState::rel, reservoir_get_next_S(), PgFdwAnalyzeState::retrieved_attrs, PgFdwAnalyzeState::rows, PgFdwAnalyzeState::rowstoskip, PgFdwAnalyzeState::rstate, sampler_random_fract(), PgFdwAnalyzeState::samplerows, PgFdwAnalyzeState::targrows, and PgFdwAnalyzeState::temp_cxt.

Referenced by postgresAcquireSampleRowsFunc().

3702 {
3703  int targrows = astate->targrows;
3704  int pos; /* array index to store tuple in */
3705  MemoryContext oldcontext;
3706 
3707  /* Always increment sample row counter. */
3708  astate->samplerows += 1;
3709 
3710  /*
3711  * Determine the slot where this sample row should be stored. Set pos to
3712  * negative value to indicate the row should be skipped.
3713  */
3714  if (astate->numrows < targrows)
3715  {
3716  /* First targrows rows are always included into the sample */
3717  pos = astate->numrows++;
3718  }
3719  else
3720  {
3721  /*
3722  * Now we start replacing tuples in the sample until we reach the end
3723  * of the relation. Same algorithm as in acquire_sample_rows in
3724  * analyze.c; see Jeff Vitter's paper.
3725  */
3726  if (astate->rowstoskip < 0)
3727  astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
3728 
3729  if (astate->rowstoskip <= 0)
3730  {
3731  /* Choose a random reservoir element to replace. */
3732  pos = (int) (targrows * sampler_random_fract(astate->rstate.randstate));
3733  Assert(pos >= 0 && pos < targrows);
3734  heap_freetuple(astate->rows[pos]);
3735  }
3736  else
3737  {
3738  /* Skip this tuple. */
3739  pos = -1;
3740  }
3741 
3742  astate->rowstoskip -= 1;
3743  }
3744 
3745  if (pos >= 0)
3746  {
3747  /*
3748  * Create sample tuple from current result row, and store it in the
3749  * position determined above. The tuple has to be created in anl_cxt.
3750  */
3751  oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
3752 
3753  astate->rows[pos] = make_tuple_from_result_row(res, row,
3754  astate->rel,
3755  astate->attinmeta,
3756  astate->retrieved_attrs,
3757  NULL,
3758  astate->temp_cxt);
3759 
3760  MemoryContextSwitchTo(oldcontext);
3761  }
3762 }
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
HeapTuple * rows
Definition: postgres_fdw.c:229
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
double sampler_random_fract(SamplerRandomState randstate)
Definition: sampling.c:238
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
ReservoirStateData rstate
Definition: postgres_fdw.c:236
AttInMetadata * attinmeta
Definition: postgres_fdw.c:225
MemoryContext temp_cxt
Definition: postgres_fdw.c:240
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
MemoryContext anl_cxt
Definition: postgres_fdw.c:239
SamplerRandomState randstate
Definition: sampling.h:50
double reservoir_get_next_S(ReservoirState rs, double t, int n)
Definition: sampling.c:142
static void close_cursor ( PGconn conn,
unsigned int  cursor_number 
)
static

Definition at line 3105 of file postgres_fdw.c.

References ERROR, pgfdw_exec_query(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), and snprintf().

Referenced by postgresAcquireSampleRowsFunc(), and postgresEndForeignScan().

3106 {
3107  char sql[64];
3108  PGresult *res;
3109 
3110  snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3111 
3112  /*
3113  * We don't use a PG_TRY block here, so be careful not to throw error
3114  * without releasing the PGresult.
3115  */
3116  res = pgfdw_exec_query(conn, sql);
3117  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3118  pgfdw_report_error(ERROR, res, conn, true, sql);
3119  PQclear(res);
3120 }
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
static unsigned int cursor_number
Definition: connection.c:60
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:460
static void conversion_error_callback ( void *  arg)
static

Definition at line 4934 of file postgres_fdw.c.

References tupleDesc::attrs, castNode, ConversionLocation::cur_attno, errcontext, EState::es_range_table, TargetEntry::expr, ForeignScan::fdw_scan_tlist, ConversionLocation::fsstate, get_rel_name(), get_relid_attribute_name(), IsA, list_nth(), NameStr, tupleDesc::natts, NULL, ObjectIdAttributeNumber, PlanState::plan, ScanState::ps, ConversionLocation::rel, RelationGetDescr, RelationGetRelationName, RangeTblEntry::relid, rt_fetch, SelfItemPointerAttributeNumber, ForeignScanState::ss, PlanState::state, Var::varattno, and Var::varno.

Referenced by make_tuple_from_result_row().

4935 {
4936  const char *attname = NULL;
4937  const char *relname = NULL;
4938  bool is_wholerow = false;
4940 
4941  if (errpos->rel)
4942  {
4943  /* error occurred in a scan against a foreign table */
4944  TupleDesc tupdesc = RelationGetDescr(errpos->rel);
4945 
4946  if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
4947  attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname);
4948  else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
4949  attname = "ctid";
4950  else if (errpos->cur_attno == ObjectIdAttributeNumber)
4951  attname = "oid";
4952 
4953  relname = RelationGetRelationName(errpos->rel);
4954  }
4955  else
4956  {
4957  /* error occurred in a scan against a foreign join */
4958  ForeignScanState *fsstate = errpos->fsstate;
4959  ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
4960  EState *estate = fsstate->ss.ps.state;
4961  TargetEntry *tle;
4962 
4964  errpos->cur_attno - 1));
4965 
4966  /*
4967  * Target list can have Vars and expressions. For Vars, we can get
4968  * it's relation, however for expressions we can't. Thus for
4969  * expressions, just show generic context message.
4970  */
4971  if (IsA(tle->expr, Var))
4972  {
4973  RangeTblEntry *rte;
4974  Var *var = (Var *) tle->expr;
4975 
4976  rte = rt_fetch(var->varno, estate->es_range_table);
4977 
4978  if (var->varattno == 0)
4979  is_wholerow = true;
4980  else
4981  attname = get_relid_attribute_name(rte->relid, var->varattno);
4982 
4983  relname = get_rel_name(rte->relid);
4984  }
4985  else
4986  errcontext("processing expression at position %d in select list",
4987  errpos->cur_attno);
4988  }
4989 
4990  if (relname)
4991  {
4992  if (is_wholerow)
4993  errcontext("whole-row reference to foreign table \"%s\"", relname);
4994  else if (attname)
4995  errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
4996  }
4997 }
ScanState ss
Definition: execnodes.h:1632
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
#define RelationGetDescr(relation)
Definition: rel.h:425
#define ObjectIdAttributeNumber
Definition: sysattr.h:22
#define castNode(_type_, nodeptr)
Definition: nodes.h:577
Form_pg_attribute * attrs
Definition: tupdesc.h:74
AttrNumber varattno
Definition: primnodes.h:146
List * fdw_scan_tlist
Definition: plannodes.h:558
EState * state
Definition: execnodes.h:1049
List * es_range_table
Definition: execnodes.h:372
Definition: primnodes.h:141
int natts
Definition: tupdesc.h:73
PlanState ps
Definition: execnodes.h:1288
ForeignScanState * fsstate
Definition: postgres_fdw.c:257
void * list_nth(const List *list, int n)
Definition: list.c:410
#define RelationGetRelationName(relation)
Definition: rel.h:433
#define rt_fetch(rangetable_index, rangetable)
Definition: parsetree.h:31
Index varno
Definition: primnodes.h:144
char * get_relid_attribute_name(Oid relid, AttrNumber attnum)
Definition: lsyscache.c:801
Plan * plan
Definition: execnodes.h:1047
#define NULL
Definition: c.h:226
Expr * expr
Definition: primnodes.h:1330
#define errcontext
Definition: elog.h:164
#define NameStr(name)
Definition: c.h:495
void * arg
#define SelfItemPointerAttributeNumber
Definition: sysattr.h:21
AttrNumber cur_attno
Definition: postgres_fdw.c:249
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1694
static const char ** convert_prep_stmt_params ( PgFdwModifyState fmstate,
ItemPointer  tupleid,
TupleTableSlot slot 
)
static

Definition at line 3177 of file postgres_fdw.c.

References Assert, lfirst_int, MemoryContextSwitchTo(), NIL, NULL, OutputFunctionCall(), PgFdwModifyState::p_flinfo, PgFdwModifyState::p_nums, palloc(), PointerGetDatum, reset_transmission_modes(), set_transmission_modes(), slot_getattr(), PgFdwModifyState::target_attrs, PgFdwModifyState::temp_cxt, and value.

Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().

3180 {
3181  const char **p_values;
3182  int pindex = 0;
3183  MemoryContext oldcontext;
3184 
3185  oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
3186 
3187  p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
3188 
3189  /* 1st parameter should be ctid, if it's in use */
3190  if (tupleid != NULL)
3191  {
3192  /* don't need set_transmission_modes for TID output */
3193  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3194  PointerGetDatum(tupleid));
3195  pindex++;
3196  }
3197 
3198  /* get following parameters from slot */
3199  if (slot != NULL && fmstate->target_attrs != NIL)
3200  {
3201  int nestlevel;
3202  ListCell *lc;
3203 
3204  nestlevel = set_transmission_modes();
3205 
3206  foreach(lc, fmstate->target_attrs)
3207  {
3208  int attnum = lfirst_int(lc);
3209  Datum value;
3210  bool isnull;
3211 
3212  value = slot_getattr(slot, attnum, &isnull);
3213  if (isnull)
3214  p_values[pindex] = NULL;
3215  else
3216  p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
3217  value);
3218  pindex++;
3219  }
3220 
3221  reset_transmission_modes(nestlevel);
3222  }
3223 
3224  Assert(pindex == fmstate->p_nums);
3225 
3226  MemoryContextSwitchTo(oldcontext);
3227 
3228  return p_values;
3229 }
#define NIL
Definition: pg_list.h:69
static struct @76 value
#define PointerGetDatum(X)
Definition: postgres.h:564
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int set_transmission_modes(void)
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1926
#define lfirst_int(lc)
Definition: pg_list.h:107
FmgrInfo * p_flinfo
Definition: postgres_fdw.c:183
uintptr_t Datum
Definition: postgres.h:374
MemoryContext temp_cxt
Definition: postgres_fdw.c:186
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
void reset_transmission_modes(int nestlevel)
void * palloc(Size size)
Definition: mcxt.c:891
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1143
static void create_cursor ( ForeignScanState node)
static

Definition at line 2909 of file postgres_fdw.c.

References appendStringInfo(), buf, conn, PgFdwScanState::conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, initStringInfo(), MemoryContextSwitchTo(), PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, PgFdwScanState::numParams, PgFdwScanState::param_exprs, PgFdwScanState::param_flinfo, PgFdwScanState::param_values, pfree(), pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), PQsendQueryParams(), process_query_params(), ScanState::ps, PlanState::ps_ExprContext, PgFdwScanState::query, ForeignScanState::ss, PgFdwScanState::tuples, and values.

Referenced by postgresIterateForeignScan().

2910 {
2911  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2912  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2913  int numParams = fsstate->numParams;
2914  const char **values = fsstate->param_values;
2915  PGconn *conn = fsstate->conn;
2917  PGresult *res;
2918 
2919  /*
2920  * Construct array of query parameter values in text format. We do the
2921  * conversions in the short-lived per-tuple context, so as not to cause a
2922  * memory leak over repeated scans.
2923  */
2924  if (numParams > 0)
2925  {
2926  MemoryContext oldcontext;
2927 
2928  oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2929 
2930  process_query_params(econtext,
2931  fsstate->param_flinfo,
2932  fsstate->param_exprs,
2933  values);
2934 
2935  MemoryContextSwitchTo(oldcontext);
2936  }
2937 
2938  /* Construct the DECLARE CURSOR command */
2939  initStringInfo(&buf);
2940  appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
2941  fsstate->cursor_number, fsstate->query);
2942 
2943  /*
2944  * Notice that we pass NULL for paramTypes, thus forcing the remote server
2945  * to infer types for all parameters. Since we explicitly cast every
2946  * parameter (see deparse.c), the "inference" is trivial and will produce
2947  * the desired result. This allows us to avoid assuming that the remote
2948  * server has the same OIDs we do for the parameters' types.
2949  */
2950  if (!PQsendQueryParams(conn, buf.data, numParams,
2951  NULL, values, NULL, NULL, 0))
2952  pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
2953 
2954  /*
2955  * Get the result, and check for success.
2956  *
2957  * We don't use a PG_TRY block here, so be careful not to throw error
2958  * without releasing the PGresult.
2959  */
2960  res = pgfdw_get_result(conn, buf.data);
2961  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2962  pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
2963  PQclear(res);
2964 
2965  /* Mark the cursor as created, and show no tuples have been retrieved */
2966  fsstate->cursor_exists = true;
2967  fsstate->tuples = NULL;
2968  fsstate->num_tuples = 0;
2969  fsstate->next_tuple = 0;
2970  fsstate->fetch_ct_2 = 0;
2971  fsstate->eof_reached = false;
2972 
2973  /* Clean up */
2974  pfree(buf.data);
2975 }
ScanState ss
Definition: execnodes.h:1632
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1183
List * param_exprs
Definition: postgres_fdw.c:143
ExprContext * ps_ExprContext
Definition: execnodes.h:1078
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
unsigned int cursor_number
Definition: postgres_fdw.c:139
PlanState ps
Definition: execnodes.h:1288
void pfree(void *pointer)
Definition: mcxt.c:992
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define ERROR
Definition: elog.h:43
const char ** param_values
Definition: postgres_fdw.c:144
PGconn * conn
Definition: streamutil.c:45
static char * buf
Definition: pg_test_fsync.c:65
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
FmgrInfo * param_flinfo
Definition: postgres_fdw.c:142
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:484
#define NULL
Definition: c.h:226
HeapTuple * tuples
Definition: postgres_fdw.c:147
static Datum values[MAXATTR]
Definition: bootstrap.c:162
static bool ec_member_matches_foreign ( PlannerInfo root,
RelOptInfo rel,
EquivalenceClass ec,
EquivalenceMember em,
void *  arg 
)
static

Definition at line 2880 of file postgres_fdw.c.

References ec_member_foreign_arg::already_used, ec_member_foreign_arg::current, EquivalenceMember::em_expr, equal(), list_member(), and NULL.

Referenced by postgresGetForeignPaths().

2883 {
2885  Expr *expr = em->em_expr;
2886 
2887  /*
2888  * If we've identified what we're processing in the current scan, we only
2889  * want to match that expression.
2890  */
2891  if (state->current != NULL)
2892  return equal(expr, state->current);
2893 
2894  /*
2895  * Otherwise, ignore anything we've already processed.
2896  */
2897  if (list_member(state->already_used, expr))
2898  return false;
2899 
2900  /* This is the new target to process. */
2901  state->current = expr;
2902  return true;
2903 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2870
bool list_member(const List *list, const void *datum)
Definition: list.c:444
#define NULL
Definition: c.h:226
Definition: regguts.h:298
void * arg
static void estimate_path_cost_size ( PlannerInfo root,
RelOptInfo baserel,
List join_conds,
List pathkeys,
double *  p_rows,
int *  p_width,
Cost p_startup_cost,
Cost p_total_cost 
)
static

Definition at line 2486 of file postgres_fdw.c.

References AGGSPLIT_SIMPLE, appendStringInfoString(), Assert, RelOptInfo::baserestrictcost, build_tlist_to_deparse(), clamp_row_est(), classifyConditions(), clauselist_selectivity(), conn, PathTarget::cost, cost_qual_eval(), cpu_operator_cost, cpu_tuple_cost, StringInfoData::data, DEFAULT_FDW_SORT_MULTIPLIER, deparseSelectStmtForRel(), estimate_num_groups(), RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, AggClauseCosts::finalCost, get_agg_clause_costs(), get_remote_estimate(), get_sortgrouplist_exprs(), GetConnection(), Query::groupClause, PgFdwRelationInfo::grouped_tlist, Query::hasAggs, Query::havingQual, initStringInfo(), PgFdwRelationInfo::innerrel, JOIN_INNER, PgFdwRelationInfo::joinclause_sel, PgFdwRelationInfo::joinclauses, list_concat(), list_copy(), list_length(), PgFdwRelationInfo::local_conds_cost, PgFdwRelationInfo::local_conds_sel, MemSet, Min, NIL, NULL, PgFdwRelationInfo::outerrel, RelOptInfo::pages, PlannerInfo::parse, QualCost::per_tuple, PgFdwRelationInfo::rel_startup_cost, PgFdwRelationInfo::rel_total_cost, ReleaseConnection(), RelOptInfo::relid, RELOPT_JOINREL, RELOPT_UPPER_REL, RelOptInfo::reloptkind, RelOptInfo::reltarget, PgFdwRelationInfo::remote_conds, PgFdwRelationInfo::rows, RelOptInfo::rows, seq_page_cost, QualCost::startup, AggClauseCosts::transCost, RelOptInfo::tuples, PlannerInfo::upper_targets, UPPERREL_GROUP_AGG, PgFdwRelationInfo::use_remote_estimate, PgFdwRelationInfo::user, PgFdwRelationInfo::width, and PathTarget::width.

Referenced by add_foreign_grouping_paths(), add_paths_with_pathkeys_for_rel(), postgresGetForeignJoinPaths(), postgresGetForeignPaths(), and postgresGetForeignRelSize().

2492 {
2493  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2494  double rows;
2495  double retrieved_rows;
2496  int width;
2497  Cost startup_cost;
2498  Cost total_cost;
2499  Cost cpu_per_tuple;
2500 
2501  /*
2502  * If the table or the server is configured to use remote estimates,
2503  * connect to the foreign server and execute EXPLAIN to estimate the
2504  * number of rows selected by the restriction+join clauses. Otherwise,
2505  * estimate rows using whatever statistics we have locally, in a way
2506  * similar to ordinary tables.
2507  */
2508  if (fpinfo->use_remote_estimate)
2509  {
2510  List *remote_param_join_conds;
2511  List *local_param_join_conds;
2512  StringInfoData sql;
2513  PGconn *conn;
2514  Selectivity local_sel;
2515  QualCost local_cost;
2516  List *fdw_scan_tlist = NIL;
2517  List *remote_conds;
2518 
2519  /* Required only to be passed to deparseSelectStmtForRel */
2520  List *retrieved_attrs;
2521 
2522  /*
2523  * param_join_conds might contain both clauses that are safe to send
2524  * across, and clauses that aren't.
2525  */
2526  classifyConditions(root, foreignrel, param_join_conds,
2527  &remote_param_join_conds, &local_param_join_conds);
2528 
2529  /* Build the list of columns to be fetched from the foreign server. */
2530  if (foreignrel->reloptkind == RELOPT_JOINREL ||
2531  foreignrel->reloptkind == RELOPT_UPPER_REL)
2532  fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
2533  else
2534  fdw_scan_tlist = NIL;
2535 
2536  /*
2537  * The complete list of remote conditions includes everything from
2538  * baserestrictinfo plus any extra join_conds relevant to this
2539  * particular path.
2540  */
2541  remote_conds = list_concat(list_copy(remote_param_join_conds),
2542  fpinfo->remote_conds);
2543 
2544  /*
2545  * Construct EXPLAIN query including the desired SELECT, FROM, and
2546  * WHERE clauses. Params and other-relation Vars are replaced by dummy
2547  * values, so don't request params_list.
2548  */
2549  initStringInfo(&sql);
2550  appendStringInfoString(&sql, "EXPLAIN ");
2551  deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
2552  remote_conds, pathkeys, &retrieved_attrs,
2553  NULL);
2554 
2555  /* Get the remote estimate */
2556  conn = GetConnection(fpinfo->user, false);
2557  get_remote_estimate(sql.data, conn, &rows, &width,
2558  &startup_cost, &total_cost);
2559  ReleaseConnection(conn);
2560 
2561  retrieved_rows = rows;
2562 
2563  /* Factor in the selectivity of the locally-checked quals */
2564  local_sel = clauselist_selectivity(root,
2565  local_param_join_conds,
2566  foreignrel->relid,
2567  JOIN_INNER,
2568  NULL);
2569  local_sel *= fpinfo->local_conds_sel;
2570 
2571  rows = clamp_row_est(rows * local_sel);
2572 
2573  /* Add in the eval cost of the locally-checked quals */
2574  startup_cost += fpinfo->local_conds_cost.startup;
2575  total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2576  cost_qual_eval(&local_cost, local_param_join_conds, root);
2577  startup_cost += local_cost.startup;
2578  total_cost += local_cost.per_tuple * retrieved_rows;
2579  }
2580  else
2581  {
2582  Cost run_cost = 0;
2583 
2584  /*
2585  * We don't support join conditions in this mode (hence, no
2586  * parameterized paths can be made).
2587  */
2588  Assert(param_join_conds == NIL);
2589 
2590  /*
2591  * Use rows/width estimates made by set_baserel_size_estimates() for
2592  * base foreign relations and set_joinrel_size_estimates() for join
2593  * between foreign relations.
2594  */
2595  rows = foreignrel->rows;
2596  width = foreignrel->reltarget->width;
2597 
2598  /* Back into an estimate of the number of retrieved rows. */
2599  retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
2600 
2601  /*
2602  * We will come here again and again with different set of pathkeys
2603  * that caller wants to cost. We don't need to calculate the cost of
2604  * bare scan each time. Instead, use the costs if we have cached them
2605  * already.
2606  */
2607  if (fpinfo->rel_startup_cost > 0 && fpinfo->rel_total_cost > 0)
2608  {
2609  startup_cost = fpinfo->rel_startup_cost;
2610  run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
2611  }
2612  else if (foreignrel->reloptkind == RELOPT_JOINREL)
2613  {
2614  PgFdwRelationInfo *fpinfo_i;
2615  PgFdwRelationInfo *fpinfo_o;
2616  QualCost join_cost;
2617  QualCost remote_conds_cost;
2618  double nrows;
2619 
2620  /* For join we expect inner and outer relations set */
2621  Assert(fpinfo->innerrel && fpinfo->outerrel);
2622 
2623  fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
2624  fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2625 
2626  /* Estimate of number of rows in cross product */
2627  nrows = fpinfo_i->rows * fpinfo_o->rows;
2628  /* Clamp retrieved rows estimate to at most size of cross product */
2629  retrieved_rows = Min(retrieved_rows, nrows);
2630 
2631  /*
2632  * The cost of foreign join is estimated as cost of generating
2633  * rows for the joining relations + cost for applying quals on the
2634  * rows.
2635  */
2636 
2637  /*
2638  * Calculate the cost of clauses pushed down to the foreign server
2639  */
2640  cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
2641  /* Calculate the cost of applying join clauses */
2642  cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
2643 
2644  /*
2645  * Startup cost includes startup cost of joining relations and the
2646  * startup cost for join and other clauses. We do not include the
2647  * startup cost specific to join strategy (e.g. setting up hash
2648  * tables) since we do not know what strategy the foreign server
2649  * is going to use.
2650  */
2651  startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
2652  startup_cost += join_cost.startup;
2653  startup_cost += remote_conds_cost.startup;
2654  startup_cost += fpinfo->local_conds_cost.startup;
2655 
2656  /*
2657  * Run time cost includes:
2658  *
2659  * 1. Run time cost (total_cost - startup_cost) of relations being
2660  * joined
2661  *
2662  * 2. Run time cost of applying join clauses on the cross product
2663  * of the joining relations.
2664  *
2665  * 3. Run time cost of applying pushed down other clauses on the
2666  * result of join
2667  *
2668  * 4. Run time cost of applying nonpushable other clauses locally
2669  * on the result fetched from the foreign server.
2670  */
2671  run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
2672  run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
2673  run_cost += nrows * join_cost.per_tuple;
2674  nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
2675  run_cost += nrows * remote_conds_cost.per_tuple;
2676  run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
2677  }
2678  else if (foreignrel->reloptkind == RELOPT_UPPER_REL)
2679  {
2680  PgFdwRelationInfo *ofpinfo;
2681  PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
2682  AggClauseCosts aggcosts;
2683  double input_rows;
2684  int numGroupCols;
2685  double numGroups = 1;
2686 
2687  /*
2688  * This cost model is mixture of costing done for sorted and
2689  * hashed aggregates in cost_agg(). We are not sure which
2690  * strategy will be considered at remote side, thus for
2691  * simplicity, we put all startup related costs in startup_cost
2692  * and all finalization and run cost are added in total_cost.
2693  *
2694  * Also, core does not care about costing HAVING expressions and
2695  * adding that to the costs. So similarly, here too we are not
2696  * considering remote and local conditions for costing.
2697  */
2698 
2699  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
2700 
2701  /* Get rows and width from input rel */
2702  input_rows = ofpinfo->rows;
2703  width = ofpinfo->width;
2704 
2705  /* Collect statistics about aggregates for estimating costs. */
2706  MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
2707  if (root->parse->hasAggs)
2708  {
2709  get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
2710  AGGSPLIT_SIMPLE, &aggcosts);
2711  get_agg_clause_costs(root, (Node *) root->parse->havingQual,
2712  AGGSPLIT_SIMPLE, &aggcosts);
2713  }
2714 
2715  /* Get number of grouping columns and possible number of groups */
2716  numGroupCols = list_length(root->parse->groupClause);
2717  numGroups = estimate_num_groups(root,
2719  fpinfo->grouped_tlist),
2720  input_rows, NULL);
2721 
2722  /*
2723  * Number of rows expected from foreign server will be same as
2724  * that of number of groups.
2725  */
2726  rows = retrieved_rows = numGroups;
2727 
2728  /*-----
2729  * Startup cost includes:
2730  * 1. Startup cost for underneath input * relation
2731  * 2. Cost of performing aggregation, per cost_agg()
2732  * 3. Startup cost for PathTarget eval
2733  *-----
2734  */
2735  startup_cost = ofpinfo->rel_startup_cost;
2736  startup_cost += aggcosts.transCost.startup;
2737  startup_cost += aggcosts.transCost.per_tuple * input_rows;
2738  startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
2739  startup_cost += ptarget->cost.startup;
2740 
2741  /*-----
2742  * Run time cost includes:
2743  * 1. Run time cost of underneath input relation
2744  * 2. Run time cost of performing aggregation, per cost_agg()
2745  * 3. PathTarget eval cost for each output row
2746  *-----
2747  */
2748  run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
2749  run_cost += aggcosts.finalCost * numGroups;
2750  run_cost += cpu_tuple_cost * numGroups;
2751  run_cost += ptarget->cost.per_tuple * numGroups;
2752  }
2753  else
2754  {
2755  /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
2756  retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
2757 
2758  /*
2759  * Cost as though this were a seqscan, which is pessimistic. We
2760  * effectively imagine the local_conds are being evaluated
2761  * remotely, too.
2762  */
2763  startup_cost = 0;
2764  run_cost = 0;
2765  run_cost += seq_page_cost * foreignrel->pages;
2766 
2767  startup_cost += foreignrel->baserestrictcost.startup;
2768  cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
2769  run_cost += cpu_per_tuple * foreignrel->tuples;
2770  }
2771 
2772  /*
2773  * Without remote estimates, we have no real way to estimate the cost
2774  * of generating sorted output. It could be free if the query plan
2775  * the remote side would have chosen generates properly-sorted output
2776  * anyway, but in most cases it will cost something. Estimate a value
2777  * high enough that we won't pick the sorted path when the ordering
2778  * isn't locally useful, but low enough that we'll err on the side of
2779  * pushing down the ORDER BY clause when it's useful to do so.
2780  */
2781  if (pathkeys != NIL)
2782  {
2783  startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2784  run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
2785  }
2786 
2787  total_cost = startup_cost + run_cost;
2788  }
2789 
2790  /*
2791  * Cache the costs for scans without any pathkeys or parameterization
2792  * before adding the costs for transferring data from the foreign server.
2793  * These costs are useful for costing the join between this relation and
2794  * another foreign relation or to calculate the costs of paths with
2795  * pathkeys for this relation, when the costs can not be obtained from the
2796  * foreign server. This function will be called at least once for every
2797  * foreign relation without pathkeys and parameterization.
2798  */
2799  if (pathkeys == NIL && param_join_conds == NIL)
2800  {
2801  fpinfo->rel_startup_cost = startup_cost;
2802  fpinfo->rel_total_cost = total_cost;
2803  }
2804 
2805  /*
2806  * Add some additional cost factors to account for connection overhead
2807  * (fdw_startup_cost), transferring data across the network
2808  * (fdw_tuple_cost per retrieved row), and local manipulation of the data
2809  * (cpu_tuple_cost per retrieved row).
2810  */
2811  startup_cost += fpinfo->fdw_startup_cost;
2812  total_cost += fpinfo->fdw_startup_cost;
2813  total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
2814  total_cost += cpu_tuple_cost * retrieved_rows;
2815 
2816  /* Return results. */
2817  *p_rows = rows;
2818  *p_width = width;
2819  *p_startup_cost = startup_cost;
2820  *p_total_cost = total_cost;
2821 }
#define NIL
Definition: pg_list.h:69
double estimate_num_groups(PlannerInfo *root, List *groupExprs, double input_rows, List **pgset)
Definition: selfuncs.c:3272
Query * parse
Definition: relation.h:152
void get_agg_clause_costs(PlannerInfo *root, Node *clause, AggSplit aggsplit, AggClauseCosts *costs)
Definition: clauses.c:466
bool hasAggs
Definition: parsenodes.h:116
void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, List **retrieved_attrs, List **params_list)
Definition: deparse.c:902
#define Min(x, y)
Definition: c.h:802
void classifyConditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, List **remote_conds, List **local_conds)
Definition: deparse.c:186
List * list_copy(const List *oldlist)
Definition: list.c:1160
Definition: nodes.h:508
#define MemSet(start, val, len)
Definition: c.h:853
List * list_concat(List *list1, List *list2)
Definition: list.c:321
double Selectivity
Definition: nodes.h:631
QualCost transCost
Definition: relation.h:62
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
Cost startup
Definition: relation.h:45
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
Cost per_tuple
Definition: relation.h:46
void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root)
Definition: costsize.c:3199
PGconn * conn
Definition: streamutil.c:45
Selectivity local_conds_sel
Definition: postgres_fdw.h:56
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:189
Selectivity joinclause_sel
Definition: postgres_fdw.h:59
double cpu_operator_cost
Definition: costsize.c:108
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
UserMapping * user
Definition: postgres_fdw.h:79
Cost finalCost
Definition: relation.h:63
void * fdw_private
Definition: relation.h:541
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:97
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define DEFAULT_FDW_SORT_MULTIPLIER
Definition: postgres_fdw.c:53
QualCost cost
Definition: relation.h:826
static int list_length(const List *l)
Definition: pg_list.h:89
List * get_sortgrouplist_exprs(List *sgClauses, List *targetList)
Definition: tlist.c:395
double cpu_tuple_cost
Definition: costsize.c:106
static void get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost)
RelOptInfo * innerrel
Definition: postgres_fdw.h:92
List * build_tlist_to_deparse(RelOptInfo *foreignrel)
Definition: deparse.c:854
List * groupClause
Definition: parsenodes.h:137
Selectivity clauselist_selectivity(PlannerInfo *root, List *clauses, int varRelid, JoinType jointype, SpecialJoinInfo *sjinfo)
Definition: clausesel.c:92
Node * havingQual
Definition: parsenodes.h:141
double clamp_row_est(double nrows)
Definition: costsize.c:172
double seq_page_cost
Definition: costsize.c:104
Definition: pg_list.h:45
double Cost
Definition: nodes.h:632
QualCost local_conds_cost
Definition: postgres_fdw.h:55
struct PathTarget * upper_targets[UPPERREL_FINAL+1]
Definition: relation.h:270
static void execute_dml_stmt ( ForeignScanState node)
static

Definition at line 3268 of file postgres_fdw.c.

References PgFdwDirectModifyState::conn, ERROR, ForeignScanState::fdw_state, PgFdwDirectModifyState::has_returning, NULL, PgFdwDirectModifyState::num_tuples, PgFdwDirectModifyState::numParams, PgFdwDirectModifyState::param_exprs, PgFdwDirectModifyState::param_flinfo, PgFdwDirectModifyState::param_values, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQcmdTuples(), PQntuples(), PQresultStatus(), PQsendQueryParams(), process_query_params(), ScanState::ps, PlanState::ps_ExprContext, PgFdwDirectModifyState::query, PgFdwDirectModifyState::result, ForeignScanState::ss, and values.

Referenced by postgresIterateDirectModify().

3269 {
3271  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3272  int numParams = dmstate->numParams;
3273  const char **values = dmstate->param_values;
3274 
3275  /*
3276  * Construct array of query parameter values in text format.
3277  */
3278  if (numParams > 0)
3279  process_query_params(econtext,
3280  dmstate->param_flinfo,
3281  dmstate->param_exprs,
3282  values);
3283 
3284  /*
3285  * Notice that we pass NULL for paramTypes, thus forcing the remote server
3286  * to infer types for all parameters. Since we explicitly cast every
3287  * parameter (see deparse.c), the "inference" is trivial and will produce
3288  * the desired result. This allows us to avoid assuming that the remote
3289  * server has the same OIDs we do for the parameters' types.
3290  */
3291  if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3292  NULL, values, NULL, NULL, 0))
3293  pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3294 
3295  /*
3296  * Get the result, and check for success.
3297  *
3298  * We don't use a PG_TRY block here, so be careful not to throw error
3299  * without releasing the PGresult.
3300  */
3301  dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
3302  if (PQresultStatus(dmstate->result) !=
3304  pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
3305  dmstate->query);
3306 
3307  /* Get the number of rows affected. */
3308  if (dmstate->has_returning)
3309  dmstate->num_tuples = PQntuples(dmstate->result);
3310  else
3311  dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
3312 }
ScanState ss
Definition: execnodes.h:1632
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1183
const char ** param_values
Definition: postgres_fdw.c:208
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3014
ExprContext * ps_ExprContext
Definition: execnodes.h:1078
static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, const char **param_values)
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
PlanState ps
Definition: execnodes.h:1288
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:484
#define NULL
Definition: c.h:226
static Datum values[MAXATTR]
Definition: bootstrap.c:162
static void fetch_more_data ( ForeignScanState node)
static

Definition at line 2981 of file postgres_fdw.c.

References Assert, PgFdwScanState::attinmeta, PgFdwScanState::batch_cxt, conn, PgFdwScanState::conn, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, PgFdwScanState::fetch_size, i, IsA, make_tuple_from_result_row(), MemoryContextReset(), MemoryContextSwitchTo(), PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_exec_query(), pgfdw_report_error(), PGRES_TUPLES_OK, PlanState::plan, PQclear(), PQntuples(), PQresultStatus(), ScanState::ps, PgFdwScanState::query, PgFdwScanState::rel, PgFdwScanState::retrieved_attrs, snprintf(), ForeignScanState::ss, PgFdwScanState::temp_cxt, and PgFdwScanState::tuples.

Referenced by postgresIterateForeignScan().

2982 {
2983  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
2984  PGresult *volatile res = NULL;
2985  MemoryContext oldcontext;
2986 
2987  /*
2988  * We'll store the tuples in the batch_cxt. First, flush the previous
2989  * batch.
2990  */
2991  fsstate->tuples = NULL;
2992  MemoryContextReset(fsstate->batch_cxt);
2993  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
2994 
2995  /* PGresult must be released before leaving this function. */
2996  PG_TRY();
2997  {
2998  PGconn *conn = fsstate->conn;
2999  char sql[64];
3000  int numrows;
3001  int i;
3002 
3003  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3004  fsstate->fetch_size, fsstate->cursor_number);
3005 
3006  res = pgfdw_exec_query(conn, sql);
3007  /* On error, report the original query, not the FETCH. */
3008  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3009  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3010 
3011  /* Convert the data into HeapTuples */
3012  numrows = PQntuples(res);
3013  fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3014  fsstate->num_tuples = numrows;
3015  fsstate->next_tuple = 0;
3016 
3017  for (i = 0; i < numrows; i++)
3018  {
3019  Assert(IsA(node->ss.ps.plan, ForeignScan));
3020 
3021  fsstate->tuples[i] =
3023  fsstate->rel,
3024  fsstate->attinmeta,
3025  fsstate->retrieved_attrs,
3026  node,
3027  fsstate->temp_cxt);
3028  }
3029 
3030  /* Update fetch_ct_2 */
3031  if (fsstate->fetch_ct_2 < 2)
3032  fsstate->fetch_ct_2++;
3033 
3034  /* Must be EOF if we didn't get as many tuples as we asked for. */
3035  fsstate->eof_reached = (numrows < fsstate->fetch_size);
3036 
3037  PQclear(res);
3038  res = NULL;
3039  }
3040  PG_CATCH();
3041  {
3042  if (res)
3043  PQclear(res);
3044  PG_RE_THROW();
3045  }
3046  PG_END_TRY();
3047 
3048  MemoryContextSwitchTo(oldcontext);
3049 }
ScanState ss
Definition: execnodes.h:1632
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
List * retrieved_attrs
Definition: postgres_fdw.c:135
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
unsigned int cursor_number
Definition: postgres_fdw.c:139
PlanState ps
Definition: execnodes.h:1288
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:45
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
AttInMetadata * attinmeta
Definition: postgres_fdw.c:131
void * palloc0(Size size)
Definition: mcxt.c:920
MemoryContext temp_cxt
Definition: postgres_fdw.c:157
Plan * plan
Definition: execnodes.h:1047
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
HeapTuple * tuples
Definition: postgres_fdw.c:147
#define PG_RE_THROW()
Definition: elog.h:314
int i
#define PG_TRY()
Definition: elog.h:284
MemoryContext batch_cxt
Definition: postgres_fdw.c:156
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:460
#define PG_END_TRY()
Definition: elog.h:300
Expr* find_em_expr_for_rel ( EquivalenceClass ec,
RelOptInfo rel 
)

Definition at line 5004 of file postgres_fdw.c.

References bms_is_subset(), EquivalenceClass::ec_members, EquivalenceMember::em_expr, EquivalenceMember::em_relids, lfirst, NULL, and RelOptInfo::relids.

Referenced by appendOrderByClause(), and get_useful_pathkeys_for_relation().

5005 {
5006  ListCell *lc_em;
5007 
5008  foreach(lc_em, ec->ec_members)
5009  {
5010  EquivalenceMember *em = lfirst(lc_em);
5011 
5012  if (bms_is_subset(em->em_relids, rel->relids))
5013  {
5014  /*
5015  * If there is more than one equivalence member whose Vars are
5016  * taken entirely from this relation, we'll be content to choose
5017  * any one of those.
5018  */
5019  return em->em_expr;
5020  }
5021  }
5022 
5023  /* We didn't find any suitable equivalence class expression */
5024  return NULL;
5025 }
bool bms_is_subset(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:307
Relids relids
Definition: relation.h:490
Relids em_relids
Definition: relation.h:763
#define NULL
Definition: c.h:226
#define lfirst(lc)
Definition: pg_list.h:106
List * ec_members
Definition: relation.h:714
static bool foreign_grouping_ok ( PlannerInfo root,
RelOptInfo grouped_rel 
)
static

Definition at line 4449 of file postgres_fdw.c.

References add_to_flat_tlist(), appendStringInfo(), apply_pathtarget_labeling_to_tlist(), copy_pathtarget(), StringInfoData::data, PathTarget::exprs, RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, PgFdwRelationInfo::fetch_size, get_pathtarget_sortgroupref, get_sortgroupref_clause_noerr(), Query::groupClause, PgFdwRelationInfo::grouped_tlist, Query::groupingSets, PlannerInfo::hasHavingQual, Query::havingQual, i, is_foreign_expr(), IsA, lappend(), lfirst, list_make1, PgFdwRelationInfo::local_conds, makeStringInfo(), NIL, PgFdwRelationInfo::outerrel, PlannerInfo::parse, pull_var_clause(), PgFdwRelationInfo::pushdown_safe, PVC_INCLUDE_AGGREGATES, PgFdwRelationInfo::rel_startup_cost, PgFdwRelationInfo::rel_total_cost, PgFdwRelationInfo::relation_name, PgFdwRelationInfo::remote_conds, PathTarget::sortgrouprefs, PlannerInfo::upper_targets, UPPERREL_GROUP_AGG, and PgFdwRelationInfo::use_remote_estimate.

Referenced by add_foreign_grouping_paths().

4450 {
4451  Query *query = root->parse;
4452  PathTarget *grouping_target;
4453  PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
4454  PgFdwRelationInfo *ofpinfo;
4455  List *aggvars;
4456  ListCell *lc;
4457  int i;
4458  List *tlist = NIL;
4459 
4460  /* Grouping Sets are not pushable */
4461  if (query->groupingSets)
4462  return false;
4463 
4464  /* Get the fpinfo of the underlying scan relation. */
4465  ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
4466 
4467  /*
4468  * If underneath input relation has any local conditions, those conditions
4469  * are required to be applied before performing aggregation. Hence the
4470  * aggregate cannot be pushed down.
4471  */
4472  if (ofpinfo->local_conds)
4473  return false;
4474 
4475  /*
4476  * The targetlist expected from this node and the targetlist pushed down
4477  * to the foreign server may be different. The latter requires
4478  * sortgrouprefs to be set to push down GROUP BY clause, but should not
4479  * have those arising from ORDER BY clause. These sortgrouprefs may be
4480  * different from those in the plan's targetlist. Use a copy of path
4481  * target to record the new sortgrouprefs.
4482  */
4483  grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]);
4484 
4485  /*
4486  * Evaluate grouping targets and check whether they are safe to push down
4487  * to the foreign side. All GROUP BY expressions will be part of the
4488  * grouping target and thus there is no need to evaluate it separately.
4489  * While doing so, add required expressions into target list which can
4490  * then be used to pass to foreign server.
4491  */
4492  i = 0;
4493  foreach(lc, grouping_target->exprs)
4494  {
4495  Expr *expr = (Expr *) lfirst(lc);
4496  Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
4497  ListCell *l;
4498 
4499  /* Check whether this expression is part of GROUP BY clause */
4500  if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
4501  {
4502  /*
4503  * If any of the GROUP BY expression is not shippable we can not
4504  * push down aggregation to the foreign server.
4505  */
4506  if (!is_foreign_expr(root, grouped_rel, expr))
4507  return false;
4508 
4509  /* Pushable, add to tlist */
4510  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4511  }
4512  else
4513  {
4514  /* Check entire expression whether it is pushable or not */
4515  if (is_foreign_expr(root, grouped_rel, expr))
4516  {
4517  /* Pushable, add to tlist */
4518  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4519  }
4520  else
4521  {
4522  /*
4523  * If we have sortgroupref set, then it means that we have an
4524  * ORDER BY entry pointing to this expression. Since we are
4525  * not pushing ORDER BY with GROUP BY, clear it.
4526  */
4527  if (sgref)
4528  grouping_target->sortgrouprefs[i] = 0;
4529 
4530  /* Not matched exactly, pull the var with aggregates then */
4531  aggvars = pull_var_clause((Node *) expr,
4533 
4534  if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
4535  return false;
4536 
4537  /*
4538  * Add aggregates, if any, into the targetlist. Plain var
4539  * nodes should be either same as some GROUP BY expression or
4540  * part of some GROUP BY expression. In later case, the query
4541  * cannot refer plain var nodes without the surrounding
4542  * expression. In both the cases, they are already part of
4543  * the targetlist and thus no need to add them again. In fact
4544  * adding pulled plain var nodes in SELECT clause will cause
4545  * an error on the foreign server if they are not same as some
4546  * GROUP BY expression.
4547  */
4548  foreach(l, aggvars)
4549  {
4550  Expr *expr = (Expr *) lfirst(l);
4551 
4552  if (IsA(expr, Aggref))
4553  tlist = add_to_flat_tlist(tlist, list_make1(expr));
4554  }
4555  }
4556  }
4557 
4558  i++;
4559  }
4560 
4561  /*
4562  * Classify the pushable and non-pushable having clauses and save them in
4563  * remote_conds and local_conds of the grouped rel's fpinfo.
4564  */
4565  if (root->hasHavingQual && query->havingQual)
4566  {
4567  ListCell *lc;
4568 
4569  foreach(lc, (List *) query->havingQual)
4570  {
4571  Expr *expr = (Expr *) lfirst(lc);
4572 
4573  if (!is_foreign_expr(root, grouped_rel, expr))
4574  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4575  else
4576  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4577  }
4578  }
4579 
4580  /*
4581  * If there are any local conditions, pull Vars and aggregates from it and
4582  * check whether they are safe to pushdown or not.
4583  */
4584  if (fpinfo->local_conds)
4585  {
4586  ListCell *lc;
4587  List *aggvars = pull_var_clause((Node *) fpinfo->local_conds,
4589 
4590  foreach(lc, aggvars)
4591  {
4592  Expr *expr = (Expr *) lfirst(lc);
4593 
4594  /*
4595  * If aggregates within local conditions are not safe to push
4596  * down, then we cannot push down the query. Vars are already
4597  * part of GROUP BY clause which are checked above, so no need to
4598  * access them again here.
4599  */
4600  if (IsA(expr, Aggref))
4601  {
4602  if (!is_foreign_expr(root, grouped_rel, expr))
4603  return false;
4604 
4605  tlist = add_to_flat_tlist(tlist, aggvars);
4606  }
4607  }
4608  }
4609 
4610  /* Transfer any sortgroupref data to the replacement tlist */
4611  apply_pathtarget_labeling_to_tlist(tlist, grouping_target);
4612 
4613  /* Store generated targetlist */
4614  fpinfo->grouped_tlist = tlist;
4615 
4616  /* Safe to pushdown */
4617  fpinfo->pushdown_safe = true;
4618 
4619  /*
4620  * If user is willing to estimate cost for a scan using EXPLAIN, he
4621  * intends to estimate scans on that relation more accurately. Then, it
4622  * makes sense to estimate the cost of the grouping on that relation more
4623  * accurately using EXPLAIN.
4624  */
4625  fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate;
4626 
4627  /* Copy startup and tuple cost as is from underneath input rel's fpinfo */
4628  fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost;
4629  fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost;
4630 
4631  /*
4632  * Set cached relation costs to some negative value, so that we can detect
4633  * when they are set to some sensible costs, during one (usually the
4634  * first) of the calls to estimate_path_cost_size().
4635  */
4636  fpinfo->rel_startup_cost = -1;
4637  fpinfo->rel_total_cost = -1;
4638 
4639  /* Set fetch size same as that of underneath input rel's fpinfo */
4640  fpinfo->fetch_size = ofpinfo->fetch_size;
4641 
4642  /*
4643  * Set the string describing this grouped relation to be used in EXPLAIN
4644  * output of corresponding ForeignScan.
4645  */
4646  fpinfo->relation_name = makeStringInfo();
4647  appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
4648  ofpinfo->relation_name->data);
4649 
4650  return true;
4651 }
#define NIL
Definition: pg_list.h:69
PathTarget * copy_pathtarget(PathTarget *src)
Definition: tlist.c:629
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
Query * parse
Definition: relation.h:152
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
List * groupingSets
Definition: parsenodes.h:139
Definition: nodes.h:508
List * pull_var_clause(Node *node, int flags)
Definition: var.c:535
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
#define PVC_INCLUDE_AGGREGATES
Definition: var.h:20
#define list_make1(x1)
Definition: pg_list.h:133
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
Index * sortgrouprefs
Definition: relation.h:825
#define get_pathtarget_sortgroupref(target, colno)
Definition: relation.h:831
List * lappend(List *list, void *datum)
Definition: list.c:128
List * exprs
Definition: relation.h:824
void apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target)
Definition: tlist.c:736
SortGroupClause * get_sortgroupref_clause_noerr(Index sortref, List *clauses)
Definition: tlist.c:446
unsigned int Index
Definition: c.h:362
StringInfo relation_name
Definition: postgres_fdw.h:88
void * fdw_private
Definition: relation.h:541
List * add_to_flat_tlist(List *tlist, List *exprs)
Definition: tlist.c:135
#define lfirst(lc)
Definition: pg_list.h:106
List * groupClause
Definition: parsenodes.h:137
bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
Definition: deparse.c:212
int i
bool hasHavingQual
Definition: relation.h:297
Node * havingQual
Definition: parsenodes.h:141
Definition: pg_list.h:45
struct PathTarget * upper_targets[UPPERREL_FINAL+1]
Definition: relation.h:270
static bool foreign_join_ok ( PlannerInfo root,
RelOptInfo joinrel,
JoinType  jointype,
RelOptInfo outerrel,
RelOptInfo innerrel,
JoinPathExtraData extra 
)
static

Definition at line 4038 of file postgres_fdw.c.

References appendStringInfo(), Assert, bms_is_subset(), bms_nonempty_difference(), StringInfoData::data, elog, ERROR, extract_actual_clauses(), extract_actual_join_clauses(), RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, PgFdwRelationInfo::fetch_size, get_jointype_name(), PgFdwRelationInfo::innerrel, is_foreign_expr(), IS_OUTER_JOIN, JOIN_FULL, JOIN_INNER, JOIN_LEFT, JOIN_RIGHT, PgFdwRelationInfo::joinclauses, PgFdwRelationInfo::jointype, lappend(), lfirst, list_concat(), list_copy(), PgFdwRelationInfo::local_conds, makeStringInfo(), NIL, NULL, PgFdwRelationInfo::outerrel, PlaceHolderInfo::ph_eval_at, PlannerInfo::placeholder_list, PgFdwRelationInfo::pushdown_safe, PgFdwRelationInfo::rel_startup_cost, PgFdwRelationInfo::rel_total_cost, PgFdwRelationInfo::relation_name, RelOptInfo::relids, PgFdwRelationInfo::remote_conds, JoinPathExtraData::restrictlist, PgFdwRelationInfo::server, PgFdwRelationInfo::use_remote_estimate, and PgFdwRelationInfo::user.

Referenced by postgresGetForeignJoinPaths().

4041 {
4042  PgFdwRelationInfo *fpinfo;
4043  PgFdwRelationInfo *fpinfo_o;
4044  PgFdwRelationInfo *fpinfo_i;
4045  ListCell *lc;
4046  List *joinclauses;
4047  List *otherclauses;
4048 
4049  /*
4050  * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
4051  * Constructing queries representing SEMI and ANTI joins is hard, hence
4052  * not considered right now.
4053  */
4054  if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
4055  jointype != JOIN_RIGHT && jointype != JOIN_FULL)
4056  return false;
4057 
4058  /*
4059  * If either of the joining relations is marked as unsafe to pushdown, the
4060  * join can not be pushed down.
4061  */
4062  fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
4063  fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
4064  fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
4065  if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
4066  !fpinfo_i || !fpinfo_i->pushdown_safe)
4067  return false;
4068 
4069  /*
4070  * If joining relations have local conditions, those conditions are
4071  * required to be applied before joining the relations. Hence the join can
4072  * not be pushed down.
4073  */
4074  if (fpinfo_o->local_conds || fpinfo_i->local_conds)
4075  return false;
4076 
4077  /* Separate restrict list into join quals and quals on join relation */
4078  if (IS_OUTER_JOIN(jointype))
4079  extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses);
4080  else
4081  {
4082  /*
4083  * Unlike an outer join, for inner join, the join result contains only
4084  * the rows which satisfy join clauses, similar to the other clause.
4085  * Hence all clauses can be treated as other quals. This helps to push
4086  * a join down to the foreign server even if some of its join quals
4087  * are not safe to pushdown.
4088  */
4089  otherclauses = extract_actual_clauses(extra->restrictlist, false);
4090  joinclauses = NIL;
4091  }
4092 
4093  /* Join quals must be safe to push down. */
4094  foreach(lc, joinclauses)
4095  {
4096  Expr *expr = (Expr *) lfirst(lc);
4097 
4098  if (!is_foreign_expr(root, joinrel, expr))
4099  return false;
4100  }
4101 
4102  /*
4103  * deparseExplicitTargetList() isn't smart enough to handle anything other
4104  * than a Var. In particular, if there's some PlaceHolderVar that would
4105  * need to be evaluated within this join tree (because there's an upper
4106  * reference to a quantity that may go to NULL as a result of an outer
4107  * join), then we can't try to push the join down because we'll fail when
4108  * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
4109  * needs to be evaluated *at the top* of this join tree is OK, because we
4110  * can do that locally after fetching the results from the remote side.
4111  */
4112  foreach(lc, root->placeholder_list)
4113  {
4114  PlaceHolderInfo *phinfo = lfirst(lc);
4115  Relids relids = joinrel->relids;
4116 
4117  if (bms_is_subset(phinfo->ph_eval_at, relids) &&
4118  bms_nonempty_difference(relids, phinfo->ph_eval_at))
4119  return false;
4120  }
4121 
4122  /* Save the join clauses, for later use. */
4123  fpinfo->joinclauses = joinclauses;
4124 
4125  /*
4126  * Other clauses are applied after the join has been performed and thus
4127  * need not be all pushable. We will push those which can be pushed to
4128  * reduce the number of rows fetched from the foreign server. Rest of them
4129  * will be applied locally after fetching join result. Add them to fpinfo
4130  * so that other joins involving this joinrel will know that this joinrel
4131  * has local clauses.
4132  */
4133  foreach(lc, otherclauses)
4134  {
4135  Expr *expr = (Expr *) lfirst(lc);
4136 
4137  if (!is_foreign_expr(root, joinrel, expr))
4138  fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
4139  else
4140  fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
4141  }
4142 
4143  fpinfo->outerrel = outerrel;
4144  fpinfo->innerrel = innerrel;
4145  fpinfo->jointype = jointype;
4146 
4147  /*
4148  * Pull the other remote conditions from the joining relations into join
4149  * clauses or other remote clauses (remote_conds) of this relation
4150  * wherever possible. This avoids building subqueries at every join step,
4151  * which is not currently supported by the deparser logic.
4152  *
4153  * For an inner join, clauses from both the relations are added to the
4154  * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
4155  * the outer side are added to remote_conds since those can be evaluated
4156  * after the join is evaluated. The clauses from inner side are added to
4157  * the joinclauses, since they need to be evaluated while constructing the
4158  * join.
4159  *
4160  * For a FULL OUTER JOIN, the other clauses from either relation can not
4161  * be added to the joinclauses or remote_conds, since each relation acts
4162  * as an outer relation for the other. Consider such full outer join as
4163  * unshippable because of the reasons mentioned above in this comment.
4164  *
4165  * The joining sides can not have local conditions, thus no need to test
4166  * shippability of the clauses being pulled up.
4167  */
4168  switch (jointype)
4169  {
4170  case JOIN_INNER:
4171  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4172  list_copy(fpinfo_i->remote_conds));
4173  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4174  list_copy(fpinfo_o->remote_conds));
4175  break;
4176 
4177  case JOIN_LEFT:
4178  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4179  list_copy(fpinfo_i->remote_conds));
4180  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4181  list_copy(fpinfo_o->remote_conds));
4182  break;
4183 
4184  case JOIN_RIGHT:
4185  fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
4186  list_copy(fpinfo_o->remote_conds));
4187  fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
4188  list_copy(fpinfo_i->remote_conds));
4189  break;
4190 
4191  case JOIN_FULL:
4192  if (fpinfo_i->remote_conds || fpinfo_o->remote_conds)
4193  return false;
4194  break;
4195 
4196  default:
4197  /* Should not happen, we have just check this above */
4198  elog(ERROR, "unsupported join type %d", jointype);
4199  }
4200 
4201  /*
4202  * For an inner join, all restrictions can be treated alike. Treating the
4203  * pushed down conditions as join conditions allows a top level full outer
4204  * join to be deparsed without requiring subqueries.
4205  */
4206  if (jointype == JOIN_INNER)
4207  {
4208  Assert(!fpinfo->joinclauses);
4209  fpinfo->joinclauses = fpinfo->remote_conds;
4210  fpinfo->remote_conds = NIL;
4211  }
4212 
4213  /* Mark that this join can be pushed down safely */
4214  fpinfo->pushdown_safe = true;
4215 
4216  /*
4217  * If user is willing to estimate cost for a scan of either of the joining
4218  * relations using EXPLAIN, he intends to estimate scans on that relation
4219  * more accurately. Then, it makes sense to estimate the cost of the join
4220  * with that relation more accurately using EXPLAIN.
4221  */
4222  fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
4223  fpinfo_i->use_remote_estimate;
4224 
4225  /* Get user mapping */
4226  if (fpinfo->use_remote_estimate)
4227  {
4228  if (fpinfo_o->use_remote_estimate)
4229  fpinfo->user = fpinfo_o->user;
4230  else
4231  fpinfo->user = fpinfo_i->user;
4232  }
4233  else
4234  fpinfo->user = NULL;
4235 
4236  /* Get foreign server */
4237  fpinfo->server = fpinfo_o->server;
4238 
4239  /*
4240  * Since both the joining relations come from the same server, the server
4241  * level options should have same value for both the relations. Pick from
4242  * any side.
4243  */
4244  fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
4245  fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
4246 
4247  /*
4248  * Set cached relation costs to some negative value, so that we can detect
4249  * when they are set to some sensible costs, during one (usually the
4250  * first) of the calls to estimate_path_cost_size().
4251  */
4252  fpinfo->rel_startup_cost = -1;
4253  fpinfo->rel_total_cost = -1;
4254 
4255  /*
4256  * Set fetch size to maximum of the joining sides, since we are expecting
4257  * the rows returned by the join to be proportional to the relation sizes.
4258  */
4259  if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
4260  fpinfo->fetch_size = fpinfo_o->fetch_size;
4261  else
4262  fpinfo->fetch_size = fpinfo_i->fetch_size;
4263 
4264  /*
4265  * Set the string describing this join relation to be used in EXPLAIN
4266  * output of corresponding ForeignScan.
4267  */
4268  fpinfo->relation_name = makeStringInfo();
4269  appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
4270  fpinfo_o->relation_name->data,
4271  get_jointype_name(fpinfo->jointype),
4272  fpinfo_i->relation_name->data);
4273 
4274  return true;
4275 }
#define NIL
Definition: pg_list.h:69
Relids ph_eval_at
Definition: relation.h:1935
void extract_actual_join_clauses(List *restrictinfo_list, List **joinquals, List **otherquals)
Definition: restrictinfo.c:381
StringInfo makeStringInfo(void)
Definition: stringinfo.c:29
ForeignServer * server
Definition: postgres_fdw.h:78
#define IS_OUTER_JOIN(jointype)
Definition: nodes.h:714
List * list_copy(const List *oldlist)
Definition: list.c:1160
List * list_concat(List *list1, List *list2)
Definition: list.c:321
RelOptInfo * outerrel
Definition: postgres_fdw.h:91
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define ERROR
Definition: elog.h:43
bool bms_is_subset(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:307
const char * get_jointype_name(JoinType jointype)
Definition: deparse.c:1273
Relids relids
Definition: relation.h:490
List * lappend(List *list, void *datum)
Definition: list.c:128
UserMapping * user
Definition: postgres_fdw.h:79
List * restrictlist
Definition: relation.h:2049
StringInfo relation_name
Definition: postgres_fdw.h:88
void * fdw_private
Definition: relation.h:541
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define lfirst(lc)
Definition: pg_list.h:106
List * extract_actual_clauses(List *restrictinfo_list, bool pseudoconstant)
Definition: restrictinfo.c:354
RelOptInfo * innerrel
Definition: postgres_fdw.h:92
bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
Definition: deparse.c:212
List * placeholder_list
Definition: relation.h:253
#define elog
Definition: elog.h:219
Definition: pg_list.h:45
bool bms_nonempty_difference(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:464
static void get_remote_estimate ( const char *  sql,
PGconn conn,
double *  rows,
int *  width,
Cost startup_cost,
Cost total_cost 
)
static

Definition at line 2828 of file postgres_fdw.c.

References elog, ERROR, NULL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_exec_query(), pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQgetvalue(), and PQresultStatus().

Referenced by estimate_path_cost_size().

2831 {
2832  PGresult *volatile res = NULL;
2833 
2834  /* PGresult must be released before leaving this function. */
2835  PG_TRY();
2836  {
2837  char *line;
2838  char *p;
2839  int n;
2840 
2841  /*
2842  * Execute EXPLAIN remotely.
2843  */
2844  res = pgfdw_exec_query(conn, sql);
2845  if (PQresultStatus(res) != PGRES_TUPLES_OK)
2846  pgfdw_report_error(ERROR, res, conn, false, sql);
2847 
2848  /*
2849  * Extract cost numbers for topmost plan node. Note we search for a
2850  * left paren from the end of the line to avoid being confused by
2851  * other uses of parentheses.
2852  */
2853  line = PQgetvalue(res, 0, 0);
2854  p = strrchr(line, '(');
2855  if (p == NULL)
2856  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2857  n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
2858  startup_cost, total_cost, rows, width);
2859  if (n != 4)
2860  elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
2861 
2862  PQclear(res);
2863  res = NULL;
2864  }
2865  PG_CATCH();
2866  {
2867  if (res)
2868  PQclear(res);
2869  PG_RE_THROW();
2870  }
2871  PG_END_TRY();
2872 }
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define PG_RE_THROW()
Definition: elog.h:314
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:460
#define PG_END_TRY()
Definition: elog.h:300
static TupleTableSlot * get_returning_data ( ForeignScanState node)
static

Definition at line 3318 of file postgres_fdw.c.

References Assert, PgFdwDirectModifyState::attinmeta, ExecClearTuple(), ExecStoreAllNullTuple(), ExecStoreTuple(), ForeignScanState::fdw_state, PgFdwDirectModifyState::has_returning, InvalidBuffer, make_tuple_from_result_row(), PgFdwDirectModifyState::next_tuple, NULL, PgFdwDirectModifyState::num_tuples, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), ScanState::ps, PgFdwDirectModifyState::rel, PgFdwDirectModifyState::result, PgFdwDirectModifyState::retrieved_attrs, PgFdwDirectModifyState::set_processed, ForeignScanState::ss, ScanState::ss_ScanTupleSlot, PlanState::state, and PgFdwDirectModifyState::temp_cxt.

Referenced by postgresIterateDirectModify().

3319 {
3321  EState *estate = node->ss.ps.state;
3322  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
3323  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
3324 
3325  Assert(resultRelInfo->ri_projectReturning);
3326 
3327  /* If we didn't get any tuples, must be end of data. */
3328  if (dmstate->next_tuple >= dmstate->num_tuples)
3329  return ExecClearTuple(slot);
3330 
3331  /* Increment the command es_processed count if necessary. */
3332  if (dmstate->set_processed)
3333  estate->es_processed += 1;
3334 
3335  /*
3336  * Store a RETURNING tuple. If has_returning is false, just emit a dummy
3337  * tuple. (has_returning is false when the local query is of the form
3338  * "UPDATE/DELETE .. RETURNING 1" for example.)
3339  */
3340  if (!dmstate->has_returning)
3341  ExecStoreAllNullTuple(slot);
3342  else
3343  {
3344  /*
3345  * On error, be sure to release the PGresult on the way out. Callers
3346  * do not have PG_TRY blocks to ensure this happens.
3347  */
3348  PG_TRY();
3349  {
3350  HeapTuple newtup;
3351 
3352  newtup = make_tuple_from_result_row(dmstate->result,
3353  dmstate->next_tuple,
3354  dmstate->rel,
3355  dmstate->attinmeta,
3356  dmstate->retrieved_attrs,
3357  NULL,
3358  dmstate->temp_cxt);
3359  ExecStoreTuple(newtup, slot, InvalidBuffer, false);
3360  }
3361  PG_CATCH();
3362  {
3363  if (dmstate->result)
3364  PQclear(dmstate->result);
3365  PG_RE_THROW();
3366  }
3367  PG_END_TRY();
3368  }
3369  dmstate->next_tuple++;
3370 
3371  /* Make slot available for evaluation of the local query RETURNING list. */
3372  resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
3373 
3374  return slot;
3375 }
ScanState ss
Definition: execnodes.h:1632
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, MemoryContext temp_context)
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:512
AttInMetadata * attinmeta
Definition: postgres_fdw.c:195
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
#define InvalidBuffer
Definition: buf.h:25
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1291
EState * state
Definition: execnodes.h:1049
PlanState ps
Definition: execnodes.h:1288
MemoryContext temp_cxt
Definition: postgres_fdw.c:216
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
#define PG_RE_THROW()
Definition: elog.h:314
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
static List * get_useful_ecs_for_relation ( PlannerInfo root,
RelOptInfo rel 
)
static

Definition at line 694 of file postgres_fdw.c.

References bms_overlap(), EquivalenceClass::ec_relids, eclass_useful_for_merging(), PlannerInfo::eq_classes, find_childrel_top_parent(), RelOptInfo::has_eclass_joins, RelOptInfo::joininfo, lappend(), RestrictInfo::left_ec, lfirst, list_append_unique_ptr(), RestrictInfo::mergeopfamilies, NIL, RelOptInfo::relids, RELOPT_OTHER_MEMBER_REL, RelOptInfo::reloptkind, RestrictInfo::right_ec, and update_mergeclause_eclasses().

Referenced by get_useful_pathkeys_for_relation().

695 {
696  List *useful_eclass_list = NIL;
697  ListCell *lc;
698  Relids relids;
699 
700  /*
701  * First, consider whether any active EC is potentially useful for a merge
702  * join against this relation.
703  */
704  if (rel->has_eclass_joins)
705  {
706  foreach(lc, root->eq_classes)
707  {
708  EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
709 
710  if (eclass_useful_for_merging(root, cur_ec, rel))
711  useful_eclass_list = lappend(useful_eclass_list, cur_ec);
712  }
713  }
714 
715  /*
716  * Next, consider whether there are any non-EC derivable join clauses that
717  * are merge-joinable. If the joininfo list is empty, we can exit
718  * quickly.
719  */
720  if (rel->joininfo == NIL)
721  return useful_eclass_list;
722 
723  /* If this is a child rel, we must use the topmost parent rel to search. */
725  relids = find_childrel_top_parent(root, rel)->relids;
726  else
727  relids = rel->relids;
728 
729  /* Check each join clause in turn. */
730  foreach(lc, rel->joininfo)
731  {
732  RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
733 
734  /* Consider only mergejoinable clauses */
735  if (restrictinfo->mergeopfamilies == NIL)
736  continue;
737 
738  /* Make sure we've got canonical ECs. */
739  update_mergeclause_eclasses(root, restrictinfo);
740 
741  /*
742  * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
743  * that left_ec and right_ec will be initialized, per comments in
744  * distribute_qual_to_rels.
745  *
746  * We want to identify which side of this merge-joinable clause
747  * contains columns from the relation produced by this RelOptInfo. We
748  * test for overlap, not containment, because there could be extra
749  * relations on either side. For example, suppose we've got something
750  * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
751  * A.y = D.y. The input rel might be the joinrel between A and B, and
752  * we'll consider the join clause A.y = D.y. relids contains a
753  * relation not involved in the join class (B) and the equivalence
754  * class for the left-hand side of the clause contains a relation not
755  * involved in the input rel (C). Despite the fact that we have only
756  * overlap and not containment in either direction, A.y is potentially
757  * useful as a sort column.
758  *
759  * Note that it's even possible that relids overlaps neither side of
760  * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
761  * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
762  * but overlaps neither side of B. In that case, we just skip this
763  * join clause, since it doesn't suggest a useful sort order for this
764  * relation.
765  */
766  if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
767  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
768  restrictinfo->right_ec);
769  else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
770  useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
771  restrictinfo->left_ec);
772  }
773 
774  return useful_eclass_list;
775 }
bool has_eclass_joins
Definition: relation.h:551
RelOptInfo * find_childrel_top_parent(PlannerInfo *root, RelOptInfo *rel)
Definition: relnode.c:951
#define NIL
Definition: pg_list.h:69
RelOptKind reloptkind
Definition: relation.h:487
bool eclass_useful_for_merging(PlannerInfo *root, EquivalenceClass *eclass, RelOptInfo *rel)
Definition: equivclass.c:2393
EquivalenceClass * right_ec
Definition: relation.h:1686
List * mergeopfamilies
Definition: relation.h:1682
List * list_append_unique_ptr(List *list, void *datum)
Definition: list.c:975
List * joininfo
Definition: relation.h:549
Relids ec_relids
Definition: relation.h:717
Relids relids
Definition: relation.h:490
List * lappend(List *list, void *datum)
Definition: list.c:128
#define lfirst(lc)
Definition: pg_list.h:106
List * eq_classes
Definition: relation.h:232
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:442
EquivalenceClass * left_ec
Definition: relation.h:1685
Definition: pg_list.h:45
void update_mergeclause_eclasses(PlannerInfo *root, RestrictInfo *restrictinfo)
Definition: pathkeys.c:941
static List * get_useful_pathkeys_for_relation ( PlannerInfo root,
RelOptInfo rel 
)
static

Definition at line 787 of file postgres_fdw.c.

References BTLessStrategyNumber, EquivalenceClass::ec_has_volatile, EquivalenceClass::ec_opfamilies, RelOptInfo::fdw_private, find_em_expr_for_rel(), get_useful_ecs_for_relation(), is_foreign_expr(), lappend(), lfirst, linitial, linitial_oid, list_copy(), list_length(), list_make1, make_canonical_pathkey(), NIL, NULL, PathKey::pk_eclass, PlannerInfo::query_pathkeys, and PgFdwRelationInfo::use_remote_estimate.

Referenced by add_paths_with_pathkeys_for_rel().

788 {
789  List *useful_pathkeys_list = NIL;
790  List *useful_eclass_list;
792  EquivalenceClass *query_ec = NULL;
793  ListCell *lc;
794 
795  /*
796  * Pushing the query_pathkeys to the remote server is always worth
797  * considering, because it might let us avoid a local sort.
798  */
799  if (root->query_pathkeys)
800  {
801  bool query_pathkeys_ok = true;
802 
803  foreach(lc, root->query_pathkeys)
804  {
805  PathKey *pathkey = (PathKey *) lfirst(lc);
806  EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
807  Expr *em_expr;
808 
809  /*
810  * The planner and executor don't have any clever strategy for
811  * taking data sorted by a prefix of the query's pathkeys and
812  * getting it to be sorted by all of those pathkeys. We'll just
813  * end up resorting the entire data set. So, unless we can push
814  * down all of the query pathkeys, forget it.
815  *
816  * is_foreign_expr would detect volatile expressions as well, but
817  * checking ec_has_volatile here saves some cycles.
818  */
819  if (pathkey_ec->ec_has_volatile ||
820  !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
821  !is_foreign_expr(root, rel, em_expr))
822  {
823  query_pathkeys_ok = false;
824  break;
825  }
826  }
827 
828  if (query_pathkeys_ok)
829  useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
830  }
831 
832  /*
833  * Even if we're not using remote estimates, having the remote side do the
834  * sort generally won't be any worse than doing it locally, and it might
835  * be much better if the remote side can generate data in the right order
836  * without needing a sort at all. However, what we're going to do next is
837  * try to generate pathkeys that seem promising for possible merge joins,
838  * and that's more speculative. A wrong choice might hurt quite a bit, so
839  * bail out if we can't use remote estimates.
840  */
841  if (!fpinfo->use_remote_estimate)
842  return useful_pathkeys_list;
843 
844  /* Get the list of interesting EquivalenceClasses. */
845  useful_eclass_list = get_useful_ecs_for_relation(root, rel);
846 
847  /* Extract unique EC for query, if any, so we don't consider it again. */
848  if (list_length(root->query_pathkeys) == 1)
849  {
850  PathKey *query_pathkey = linitial(root->query_pathkeys);
851 
852  query_ec = query_pathkey->pk_eclass;
853  }
854 
855  /*
856  * As a heuristic, the only pathkeys we consider here are those of length
857  * one. It's surely possible to consider more, but since each one we
858  * choose to consider will generate a round-trip to the remote side, we
859  * need to be a bit cautious here. It would sure be nice to have a local
860  * cache of information about remote index definitions...
861  */
862  foreach(lc, useful_eclass_list)
863  {
864  EquivalenceClass *cur_ec = lfirst(lc);
865  Expr *em_expr;
866  PathKey *pathkey;
867 
868  /* If redundant with what we did above, skip it. */
869  if (cur_ec == query_ec)
870  continue;
871 
872  /* If no pushable expression for this rel, skip it. */
873  em_expr = find_em_expr_for_rel(cur_ec, rel);
874  if (em_expr == NULL || !is_foreign_expr(root, rel, em_expr))
875  continue;
876 
877  /* Looks like we can generate a pathkey, so let's do it. */
878  pathkey = make_canonical_pathkey(root, cur_ec,
879  linitial_oid(cur_ec->ec_opfamilies),
881  false);
882  useful_pathkeys_list = lappend(useful_pathkeys_list,
883  list_make1(pathkey));
884  }
885 
886  return useful_pathkeys_list;
887 }
#define NIL
Definition: pg_list.h:69
List * query_pathkeys
Definition: relation.h:257
static List * get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
Definition: postgres_fdw.c:694
List * list_copy(const List *oldlist)
Definition: list.c:1160
PathKey * make_canonical_pathkey(PlannerInfo *root, EquivalenceClass *eclass, Oid opfamily, int strategy, bool nulls_first)
Definition: pathkeys.c:51
#define list_make1(x1)
Definition: pg_list.h:133
#define linitial(l)
Definition: pg_list.h:110
List * lappend(List *list, void *datum)
Definition: list.c:128
List * ec_opfamilies
Definition: relation.h:712
void * fdw_private
Definition: relation.h:541
#define NULL
Definition: c.h:226
Expr * find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
#define lfirst(lc)
Definition: pg_list.h:106
EquivalenceClass * pk_eclass
Definition: relation.h:791
#define linitial_oid(l)
Definition: pg_list.h:112
static int list_length(const List *l)
Definition: pg_list.h:89
bool ec_has_volatile
Definition: relation.h:720
bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
Definition: deparse.c:212
#define BTLessStrategyNumber
Definition: stratnum.h:29
Definition: pg_list.h:45
static HeapTuple make_tuple_from_result_row ( PGresult res,
int  row,
Relation  rel,
AttInMetadata attinmeta,
List retrieved_attrs,
ForeignScanState fsstate,
MemoryContext  temp_context 
)
static

Definition at line 4764 of file postgres_fdw.c.

References ErrorContextCallback::arg, Assert, AttInMetadata::attinfuncs, AttInMetadata::attioparams, AttInMetadata::atttypmods, ErrorContextCallback::callback, conversion_error_callback(), CStringGetDatum, ConversionLocation::cur_attno, DatumGetObjectId, DatumGetPointer, DirectFunctionCall1, elog, ERROR, error_context_stack, ForeignScanState::fdw_state, ConversionLocation::fsstate, heap_form_tuple(), HeapTupleHeaderSetCmin, HeapTupleHeaderSetXmax, HeapTupleHeaderSetXmin, HeapTupleSetOid, i, InputFunctionCall(), InvalidOid, InvalidTransactionId, lfirst_int, MemoryContextReset(), MemoryContextSwitchTo(), tupleDesc::natts, NULL, ObjectIdAttributeNumber, oidin(), OidIsValid, palloc(), palloc0(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ErrorContextCallback::previous, ConversionLocation::rel, RelationGetDescr, SelfItemPointerAttributeNumber, HeapTupleHeaderData::t_ctid, HeapTupleData::t_data, HeapTupleData::t_self, tidin(), PgFdwScanState::tupdesc, and values.

Referenced by analyze_row_processor(), fetch_more_data(), get_returning_data(), and store_returning_result().

4771 {
4772  HeapTuple tuple;
4773  TupleDesc tupdesc;
4774  Datum *values;
4775  bool *nulls;
4776  ItemPointer ctid = NULL;
4777  Oid oid = InvalidOid;
4778  ConversionLocation errpos;
4779  ErrorContextCallback errcallback;
4780  MemoryContext oldcontext;
4781  ListCell *lc;
4782  int j;
4783 
4784  Assert(row < PQntuples(res));
4785 
4786  /*
4787  * Do the following work in a temp context that we reset after each tuple.
4788  * This cleans up not only the data we have direct access to, but any
4789  * cruft the I/O functions might leak.
4790  */
4791  oldcontext = MemoryContextSwitchTo(temp_context);
4792 
4793  if (rel)
4794  tupdesc = RelationGetDescr(rel);
4795  else
4796  {
4797  PgFdwScanState *fdw_sstate;
4798 
4799  Assert(fsstate);
4800  fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
4801  tupdesc = fdw_sstate->tupdesc;
4802  }
4803 
4804  values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
4805  nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
4806  /* Initialize to nulls for any columns not present in result */
4807  memset(nulls, true, tupdesc->natts * sizeof(bool));
4808 
4809  /*
4810  * Set up and install callback to report where conversion error occurs.
4811  */
4812  errpos.rel = rel;
4813  errpos.cur_attno = 0;
4814  errpos.fsstate = fsstate;
4815  errcallback.callback = conversion_error_callback;
4816  errcallback.arg = (void *) &errpos;
4817  errcallback.previous = error_context_stack;
4818  error_context_stack = &errcallback;
4819 
4820  /*
4821  * i indexes columns in the relation, j indexes columns in the PGresult.
4822  */
4823  j = 0;
4824  foreach(lc, retrieved_attrs)
4825  {
4826  int i = lfirst_int(lc);
4827  char *valstr;
4828 
4829  /* fetch next column's textual value */
4830  if (PQgetisnull(res, row, j))
4831  valstr = NULL;
4832  else
4833  valstr = PQgetvalue(res, row, j);
4834 
4835  /*
4836  * convert value to internal representation
4837  *
4838  * Note: we ignore system columns other than ctid and oid in result
4839  */
4840  errpos.cur_attno = i;
4841  if (i > 0)
4842  {
4843  /* ordinary column */
4844  Assert(i <= tupdesc->natts);
4845  nulls[i - 1] = (valstr == NULL);
4846  /* Apply the input function even to nulls, to support domains */
4847  values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
4848  valstr,
4849  attinmeta->attioparams[i - 1],
4850  attinmeta->atttypmods[i - 1]);
4851  }
4852  else if (i == SelfItemPointerAttributeNumber)
4853  {
4854  /* ctid */
4855  if (valstr != NULL)
4856  {
4857  Datum datum;
4858 
4859  datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
4860  ctid = (ItemPointer) DatumGetPointer(datum);
4861  }
4862  }
4863  else if (i == ObjectIdAttributeNumber)
4864  {
4865  /* oid */
4866  if (valstr != NULL)
4867  {
4868  Datum datum;
4869 
4870  datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
4871  oid = DatumGetObjectId(datum);
4872  }
4873  }
4874  errpos.cur_attno = 0;
4875 
4876  j++;
4877  }
4878 
4879  /* Uninstall error context callback. */
4880  error_context_stack = errcallback.previous;
4881 
4882  /*
4883  * Check we got the expected number of columns. Note: j == 0 and
4884  * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
4885  */
4886  if (j > 0 && j != PQnfields(res))
4887  elog(ERROR, "remote query result does not match the foreign table");
4888 
4889  /*
4890  * Build the result tuple in caller's memory context.
4891  */
4892  MemoryContextSwitchTo(oldcontext);
4893 
4894  tuple = heap_form_tuple(tupdesc, values, nulls);
4895 
4896  /*
4897  * If we have a CTID to return, install it in both t_self and t_ctid.
4898  * t_self is the normal place, but if the tuple is converted to a
4899  * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
4900  * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
4901  */
4902  if (ctid)
4903  tuple->t_self = tuple->t_data->t_ctid = *ctid;
4904 
4905  /*
4906  * Stomp on the xmin, xmax, and cmin fields from the tuple created by
4907  * heap_form_tuple. heap_form_tuple actually creates the tuple with
4908  * DatumTupleFields, not HeapTupleFields, but the executor expects
4909  * HeapTupleFields and will happily extract system columns on that
4910  * assumption. If we don't do this then, for example, the tuple length
4911  * ends up in the xmin field, which isn't what we want.
4912  */
4916 
4917  /*
4918  * If we have an OID to return, install it.
4919  */
4920  if (OidIsValid(oid))
4921  HeapTupleSetOid(tuple, oid);
4922 
4923  /* Clean up */
4924  MemoryContextReset(temp_context);
4925 
4926  return tuple;
4927 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
#define RelationGetDescr(relation)
Definition: rel.h:425
#define ObjectIdAttributeNumber
Definition: sysattr.h:22
#define DatumGetObjectId(X)
Definition: postgres.h:508
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int32 * atttypmods
Definition: funcapi.h:47
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:555
unsigned int Oid
Definition: postgres_ext.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
struct ErrorContextCallback * previous
Definition: elog.h:238
#define OidIsValid(objectId)
Definition: c.h:534
Oid * attioparams
Definition: funcapi.h:44
int natts
Definition: tupdesc.h:73
ItemPointerData * ItemPointer
Definition: itemptr.h:48
HeapTupleHeader t_data
Definition: htup.h:67
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:698
ErrorContextCallback * error_context_stack
Definition: elog.c:88
ForeignScanState * fsstate
Definition: postgres_fdw.c:257
#define ERROR
Definition: elog.h:43
#define lfirst_int(lc)
Definition: pg_list.h:107
ItemPointerData t_ctid
Definition: htup_details.h:150
ItemPointerData t_self
Definition: htup.h:65
#define CStringGetDatum(X)
Definition: postgres.h:586
#define HeapTupleHeaderSetXmax(tup, xid)
Definition: htup_details.h:374
#define InvalidTransactionId
Definition: transam.h:31
void * palloc0(Size size)
Definition: mcxt.c:920
static void conversion_error_callback(void *arg)
uintptr_t Datum
Definition: postgres.h:374
Datum InputFunctionCall(FmgrInfo *flinfo, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1882
#define InvalidOid
Definition: postgres_ext.h:36
Datum tidin(PG_FUNCTION_ARGS)
Definition: tid.c:53
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
TupleDesc tupdesc
Definition: postgres_fdw.c:130
#define DatumGetPointer(X)
Definition: postgres.h:557
static Datum values[MAXATTR]
Definition: bootstrap.c:162
void(* callback)(void *arg)
Definition: elog.h:239
void * palloc(Size size)
Definition: mcxt.c:891
int i
Datum oidin(PG_FUNCTION_ARGS)
Definition: oid.c:117
FmgrInfo * attinfuncs
Definition: funcapi.h:41
#define SelfItemPointerAttributeNumber
Definition: sysattr.h:21
#define elog
Definition: elog.h:219
AttrNumber cur_attno
Definition: postgres_fdw.c:249
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3092
#define HeapTupleHeaderSetCmin(tup, cid)
Definition: htup_details.h:391
#define HeapTupleHeaderSetXmin(tup, xid)
Definition: htup_details.h:313
PG_FUNCTION_INFO_V1 ( postgres_fdw_handler  )
Datum postgres_fdw_handler ( PG_FUNCTION_ARGS  )

Definition at line 425 of file postgres_fdw.c.

References FdwRoutine::AddForeignUpdateTargets, FdwRoutine::AnalyzeForeignTable, FdwRoutine::BeginDirectModify, FdwRoutine::BeginForeignModify, FdwRoutine::BeginForeignScan, FdwRoutine::EndDirectModify, FdwRoutine::EndForeignModify, FdwRoutine::EndForeignScan, FdwRoutine::ExecForeignDelete, FdwRoutine::ExecForeignInsert, FdwRoutine::ExecForeignUpdate, FdwRoutine::ExplainDirectModify, FdwRoutine::ExplainForeignModify, FdwRoutine::ExplainForeignScan, FdwRoutine::GetForeignJoinPaths, FdwRoutine::GetForeignPaths, FdwRoutine::GetForeignPlan, FdwRoutine::GetForeignRelSize, FdwRoutine::GetForeignUpperPaths, FdwRoutine::ImportForeignSchema, FdwRoutine::IsForeignRelUpdatable, FdwRoutine::IterateDirectModify, FdwRoutine::IterateForeignScan, makeNode, PG_RETURN_POINTER, FdwRoutine::PlanDirectModify, FdwRoutine::PlanForeignModify, postgresAddForeignUpdateTargets(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignModify(), postgresBeginForeignScan(), postgresEndDirectModify(), postgresEndForeignModify(), postgresEndForeignScan(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), postgresExplainDirectModify(), postgresExplainForeignModify(), postgresExplainForeignScan(), postgresGetForeignJoinPaths(), postgresGetForeignPaths(), postgresGetForeignPlan(), postgresGetForeignRelSize(), postgresGetForeignUpperPaths(), postgresImportForeignSchema(), postgresIsForeignRelUpdatable(), postgresIterateDirectModify(), postgresIterateForeignScan(), postgresPlanDirectModify(), postgresPlanForeignModify(), postgresRecheckForeignScan(), postgresReScanForeignScan(), FdwRoutine::RecheckForeignScan, and FdwRoutine::ReScanForeignScan.

426 {
427  FdwRoutine *routine = makeNode(FdwRoutine);
428 
429  /* Functions for scanning foreign tables */
437 
438  /* Functions for updating foreign tables */
451 
452  /* Function for EvalPlanQual rechecks */
454  /* Support functions for EXPLAIN */
458 
459  /* Support functions for ANALYZE */
461 
462  /* Support functions for IMPORT FOREIGN SCHEMA */
464 
465  /* Support functions for join push-down */
467 
468  /* Support functions for upper relation push-down */
470 
471  PG_RETURN_POINTER(routine);
472 }
GetForeignPlan_function GetForeignPlan
Definition: fdwapi.h:175
BeginForeignScan_function BeginForeignScan
Definition: fdwapi.h:176
GetForeignUpperPaths_function GetForeignUpperPaths
Definition: fdwapi.h:190
ExecForeignDelete_function ExecForeignDelete
Definition: fdwapi.h:198
#define PG_RETURN_POINTER(x)
Definition: fmgr.h:305
EndDirectModify_function EndDirectModify
Definition: fdwapi.h:204
static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan)
static void postgresExplainForeignModify(ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
static List * postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
ExplainForeignScan_function ExplainForeignScan
Definition: fdwapi.h:212
static TupleTableSlot * postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
AnalyzeForeignTable_function AnalyzeForeignTable
Definition: fdwapi.h:217
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:196
static void postgresBeginForeignScan(ForeignScanState *node, int eflags)
static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:482
static int postgresIsForeignRelUpdatable(Relation rel)
static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
static void postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
static TupleTableSlot * postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
AddForeignUpdateTargets_function AddForeignUpdateTargets
Definition: fdwapi.h:193
static void postgresEndDirectModify(ForeignScanState *node)
RecheckForeignScan_function RecheckForeignScan
Definition: fdwapi.h:209
IterateDirectModify_function IterateDirectModify
Definition: fdwapi.h:203
static void postgresEndForeignScan(ForeignScanState *node)
GetForeignJoinPaths_function GetForeignJoinPaths
Definition: fdwapi.h:187
static List * postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
static void postgresBeginDirectModify(ForeignScanState *node, int eflags)
GetForeignRelSize_function GetForeignRelSize
Definition: fdwapi.h:173
EndForeignScan_function EndForeignScan
Definition: fdwapi.h:179
ExplainDirectModify_function ExplainDirectModify
Definition: fdwapi.h:214
ImportForeignSchema_function ImportForeignSchema
Definition: fdwapi.h:220
PlanForeignModify_function PlanForeignModify
Definition: fdwapi.h:194
EndForeignModify_function EndForeignModify
Definition: fdwapi.h:199
GetForeignPaths_function GetForeignPaths
Definition: fdwapi.h:174
static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node)
PlanDirectModify_function PlanDirectModify
Definition: fdwapi.h:201
static void postgresGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
Definition: postgres_fdw.c:894
static void postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel)
BeginDirectModify_function BeginDirectModify
Definition: fdwapi.h:202
ExecForeignUpdate_function ExecForeignUpdate
Definition: fdwapi.h:197
static TupleTableSlot * postgresExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
#define makeNode(_type_)
Definition: nodes.h:556
ReScanForeignScan_function ReScanForeignScan
Definition: fdwapi.h:178
IterateForeignScan_function IterateForeignScan
Definition: fdwapi.h:177
static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
static void postgresBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags)
ExplainForeignModify_function ExplainForeignModify
Definition: fdwapi.h:213
static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation)
static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo)
IsForeignRelUpdatable_function IsForeignRelUpdatable
Definition: fdwapi.h:200
static TupleTableSlot * postgresIterateDirectModify(ForeignScanState *node)
static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra)
BeginForeignModify_function BeginForeignModify
Definition: fdwapi.h:195
static void postgresReScanForeignScan(ForeignScanState *node)
static int postgresAcquireSampleRowsFunc ( Relation  relation,
int  elevel,
HeapTuple rows,
int  targrows,
double *  totalrows,
double *  totaldeadrows 
)
static

Definition at line 3545 of file postgres_fdw.c.

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate(), analyze_row_processor(), PgFdwAnalyzeState::anl_cxt, appendStringInfo(), PgFdwAnalyzeState::attinmeta, CHECK_FOR_INTERRUPTS, close_cursor(), conn, CurrentMemoryContext, cursor_number, StringInfoData::data, defGetString(), DefElem::defname, deparseAnalyzeSql(), ereport, errmsg(), ERROR, fetch_size, GetConnection(), GetCursorNumber(), GetForeignServer(), GetForeignTable(), GetUserMapping(), i, initStringInfo(), lfirst, NULL, PgFdwAnalyzeState::numrows, ForeignServer::options, ForeignTable::options, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_exec_query(), pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQntuples(), PQresultStatus(), RelationData::rd_rel, PgFdwAnalyzeState::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, ReleaseConnection(), reservoir_init_selection_state(), PgFdwAnalyzeState::retrieved_attrs, PgFdwAnalyzeState::rows, PgFdwAnalyzeState::rowstoskip, PgFdwAnalyzeState::rstate, PgFdwAnalyzeState::samplerows, ForeignTable::serverid, snprintf(), PgFdwAnalyzeState::targrows, PgFdwAnalyzeState::temp_cxt, TupleDescGetAttInMetadata(), and user.

Referenced by postgresAnalyzeForeignTable().

3549 {
3550  PgFdwAnalyzeState astate;
3551  ForeignTable *table;
3552  ForeignServer *server;
3553  UserMapping *user;
3554  PGconn *conn;
3555  unsigned int cursor_number;
3556  StringInfoData sql;
3557  PGresult *volatile res = NULL;
3558 
3559  /* Initialize workspace state */
3560  astate.rel = relation;
3562 
3563  astate.rows = rows;
3564  astate.targrows = targrows;
3565  astate.numrows = 0;
3566  astate.samplerows = 0;
3567  astate.rowstoskip = -1; /* -1 means not set yet */
3568  reservoir_init_selection_state(&astate.rstate, targrows);
3569 
3570  /* Remember ANALYZE context, and create a per-tuple temp context */
3571  astate.anl_cxt = CurrentMemoryContext;
3573  "postgres_fdw temporary data",
3575 
3576  /*
3577  * Get the connection to use. We do the remote access as the table's
3578  * owner, even if the ANALYZE was started by some other user.
3579  */
3580  table = GetForeignTable(RelationGetRelid(relation));
3581  server = GetForeignServer(table->serverid);
3582  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3583  conn = GetConnection(user, false);
3584 
3585  /*
3586  * Construct cursor that retrieves whole rows from remote.
3587  */
3588  cursor_number = GetCursorNumber(conn);
3589  initStringInfo(&sql);
3590  appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
3591  deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
3592 
3593  /* In what follows, do not risk leaking any PGresults. */
3594  PG_TRY();
3595  {
3596  res = pgfdw_exec_query(conn, sql.data);
3597  if (PQresultStatus(res) != PGRES_COMMAND_OK)
3598  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3599  PQclear(res);
3600  res = NULL;
3601 
3602  /* Retrieve and process rows a batch at a time. */
3603  for (;;)
3604  {
3605  char fetch_sql[64];
3606  int fetch_size;
3607  int numrows;
3608  int i;
3609  ListCell *lc;
3610 
3611  /* Allow users to cancel long query */
3613 
3614  /*
3615  * XXX possible future improvement: if rowstoskip is large, we
3616  * could issue a MOVE rather than physically fetching the rows,
3617  * then just adjust rowstoskip and samplerows appropriately.
3618  */
3619 
3620  /* The fetch size is arbitrary, but shouldn't be enormous. */
3621  fetch_size = 100;
3622  foreach(lc, server->options)
3623  {
3624  DefElem *def = (DefElem *) lfirst(lc);
3625 
3626  if (strcmp(def->defname, "fetch_size") == 0)
3627  {
3628  fetch_size = strtol(defGetString(def), NULL, 10);
3629  break;
3630  }
3631  }
3632  foreach(lc, table->options)
3633  {
3634  DefElem *def = (DefElem *) lfirst(lc);
3635 
3636  if (strcmp(def->defname, "fetch_size") == 0)
3637  {
3638  fetch_size = strtol(defGetString(def), NULL, 10);
3639  break;
3640  }
3641  }
3642 
3643  /* Fetch some rows */
3644  snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
3645  fetch_size, cursor_number);
3646 
3647  res = pgfdw_exec_query(conn, fetch_sql);
3648  /* On error, report the original query, not the FETCH. */
3649  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3650  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3651 
3652  /* Process whatever we got. */
3653  numrows = PQntuples(res);
3654  for (i = 0; i < numrows; i++)
3655  analyze_row_processor(res, i, &astate);
3656 
3657  PQclear(res);
3658  res = NULL;
3659 
3660  /* Must be EOF if we didn't get all the rows requested. */
3661  if (numrows < fetch_size)
3662  break;
3663  }
3664 
3665  /* Close the cursor, just to be tidy. */
3666  close_cursor(conn, cursor_number);
3667  }
3668  PG_CATCH();
3669  {
3670  if (res)
3671  PQclear(res);
3672  PG_RE_THROW();
3673  }
3674  PG_END_TRY();
3675 
3676  ReleaseConnection(conn);
3677 
3678  /* We assume that we have no dead tuple. */
3679  *totaldeadrows = 0.0;
3680 
3681  /* We've retrieved all living tuples from foreign server. */
3682  *totalrows = astate.samplerows;
3683 
3684  /*
3685  * Emit some interesting relation info
3686  */
3687  ereport(elevel,
3688  (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
3689  RelationGetRelationName(relation),
3690  astate.samplerows, astate.numrows)));
3691 
3692  return astate.numrows;
3693 }
HeapTuple * rows
Definition: postgres_fdw.c:229
#define RelationGetDescr(relation)
Definition: rel.h:425
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:155
uint64 fetch_size
Definition: logging.c:21
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void reservoir_init_selection_state(ReservoirState rs, int n)
Definition: sampling.c:129
Form_pg_class rd_rel
Definition: rel.h:113
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
static void close_cursor(PGconn *conn, unsigned int cursor_number)
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:45
char * defGetString(DefElem *def)
Definition: define.c:49
#define RelationGetRelationName(relation)
Definition: rel.h:433
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static unsigned int cursor_number
Definition: connection.c:60
#define ereport(elevel, rest)
Definition: elog.h:122
ReservoirStateData rstate
Definition: postgres_fdw.c:236
void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
Definition: deparse.c:1711
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
AttInMetadata * attinmeta
Definition: postgres_fdw.c:225
static int elevel
Definition: vacuumlazy.c:136
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
ForeignServer * GetForeignServer(Oid serverid)
Definition: foreign.c:93
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:1068
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:97
#define PG_CATCH()
Definition: elog.h:293
MemoryContext temp_cxt
Definition: postgres_fdw.c:240
#define NULL
Definition: c.h:226
#define lfirst(lc)
Definition: pg_list.h:106
Oid serverid
Definition: foreign.h:67
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:433
MemoryContext anl_cxt
Definition: postgres_fdw.c:239
#define PG_RE_THROW()
Definition: elog.h:314
List * options
Definition: foreign.h:68
static char * user
Definition: pg_regress.c:90
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
int i
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:166
char * defname
Definition: parsenodes.h:675
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define PG_TRY()
Definition: elog.h:284
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:460
List * options
Definition: foreign.h:53
#define RelationGetRelid(relation)
Definition: rel.h:413
#define PG_END_TRY()
Definition: elog.h:300
static void postgresAddForeignUpdateTargets ( Query parsetree,
RangeTblEntry target_rte,
Relation  target_relation 
)
static

Definition at line 1502 of file postgres_fdw.c.

References InvalidOid, lappend(), list_length(), makeTargetEntry(), makeVar(), pstrdup(), Query::resultRelation, SelfItemPointerAttributeNumber, Query::targetList, and TIDOID.

Referenced by postgres_fdw_handler().

1505 {
1506  Var *var;
1507  const char *attrname;
1508  TargetEntry *tle;
1509 
1510  /*
1511  * In postgres_fdw, what we need is the ctid, same as for a regular table.
1512  */
1513 
1514  /* Make a Var representing the desired value */
1515  var = makeVar(parsetree->resultRelation,
1517  TIDOID,
1518  -1,
1519  InvalidOid,
1520  0);
1521 
1522  /* Wrap it in a resjunk TLE with the right name ... */
1523  attrname = "ctid";
1524 
1525  tle = makeTargetEntry((Expr *) var,
1526  list_length(parsetree->targetList) + 1,
1527  pstrdup(attrname),
1528  true);
1529 
1530  /* ... and add it to the query's targetlist */
1531  parsetree->targetList = lappend(parsetree->targetList, tle);
1532 }
char * pstrdup(const char *in)
Definition: mcxt.c:1165
int resultRelation
Definition: parsenodes.h:113
Definition: primnodes.h:141
List * targetList
Definition: parsenodes.h:131
#define TIDOID
Definition: pg_type.h:332
TargetEntry * makeTargetEntry(Expr *expr, AttrNumber resno, char *resname, bool resjunk)
Definition: makefuncs.c:235
Var * makeVar(Index varno, AttrNumber varattno, Oid vartype, int32 vartypmod, Oid varcollid, Index varlevelsup)
Definition: makefuncs.c:67
List * lappend(List *list, void *datum)
Definition: list.c:128
#define InvalidOid
Definition: postgres_ext.h:36
static int list_length(const List *l)
Definition: pg_list.h:89
#define SelfItemPointerAttributeNumber
Definition: sysattr.h:21
static bool postgresAnalyzeForeignTable ( Relation  relation,
AcquireSampleRowsFunc func,
BlockNumber totalpages 
)
static

Definition at line 3467 of file postgres_fdw.c.

References conn, StringInfoData::data, deparseAnalyzeSizeSql(), elog, ERROR, GetConnection(), GetForeignTable(), GetUserMapping(), initStringInfo(), NULL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_exec_query(), pgfdw_report_error(), PGRES_TUPLES_OK, postgresAcquireSampleRowsFunc(), PQclear(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), RelationData::rd_rel, RelationGetRelid, ReleaseConnection(), ForeignTable::serverid, and user.

Referenced by postgres_fdw_handler().

3470 {
3471  ForeignTable *table;
3472  UserMapping *user;
3473  PGconn *conn;
3474  StringInfoData sql;
3475  PGresult *volatile res = NULL;
3476 
3477  /* Return the row-analysis function pointer */
3479 
3480  /*
3481  * Now we have to get the number of pages. It's annoying that the ANALYZE
3482  * API requires us to return that now, because it forces some duplication
3483  * of effort between this routine and postgresAcquireSampleRowsFunc. But
3484  * it's probably not worth redefining that API at this point.
3485  */
3486 
3487  /*
3488  * Get the connection to use. We do the remote access as the table's
3489  * owner, even if the ANALYZE was started by some other user.
3490  */
3491  table = GetForeignTable(RelationGetRelid(relation));
3492  user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
3493  conn = GetConnection(user, false);
3494 
3495  /*
3496  * Construct command to get page count for relation.
3497  */
3498  initStringInfo(&sql);
3499  deparseAnalyzeSizeSql(&sql, relation);
3500 
3501  /* In what follows, do not risk leaking any PGresults. */
3502  PG_TRY();
3503  {
3504  res = pgfdw_exec_query(conn, sql.data);
3505  if (PQresultStatus(res) != PGRES_TUPLES_OK)
3506  pgfdw_report_error(ERROR, res, conn, false, sql.data);
3507 
3508  if (PQntuples(res) != 1 || PQnfields(res) != 1)
3509  elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
3510  *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
3511 
3512  PQclear(res);
3513  res = NULL;
3514  }
3515  PG_CATCH();
3516  {
3517  if (res)
3518  PQclear(res);
3519  PG_RE_THROW();
3520  }
3521  PG_END_TRY();
3522 
3523  ReleaseConnection(conn);
3524 
3525  return true;
3526 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
Form_pg_class rd_rel
Definition: rel.h:113
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
#define ERROR
Definition: elog.h:43
PGconn * conn
Definition: streamutil.c:45
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
void PQclear(PGresult *res)
Definition: fe-exec.c:650
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:97
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
void deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
Definition: deparse.c:1691
Oid serverid
Definition: foreign.h:67
#define PG_RE_THROW()
Definition: elog.h:314
static char * user
Definition: pg_regress.c:90
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:166
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:460
#define RelationGetRelid(relation)
Definition: rel.h:413
#define PG_END_TRY()
Definition: elog.h:300
static void postgresBeginDirectModify ( ForeignScanState node,
int  eflags 
)
static

Definition at line 2255 of file postgres_fdw.c.

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate(), EXEC_FLAG_EXPLAIN_ONLY, ForeignScan::fdw_exprs, ForeignScan::fdw_private, ForeignScanState::fdw_state, FdwDirectModifyPrivateHasReturning, FdwDirectModifyPrivateRetrievedAttrs, FdwDirectModifyPrivateSetProcessed, FdwDirectModifyPrivateUpdateSql, GetConnection(), GetForeignTable(), GetUserId(), GetUserMapping(), intVal, list_length(), list_nth(), palloc0(), PlanState::plan, prepare_query_params(), ScanState::ps, RelationGetDescr, RelationGetRelid, rt_fetch, ForeignScan::scan, Scan::scanrelid, ForeignScanState::ss, ScanState::ss_currentRelation, PlanState::state, strVal, TupleDescGetAttInMetadata(), and user.

Referenced by postgres_fdw_handler().

2256 {
2257  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2258  EState *estate = node->ss.ps.state;
2259  PgFdwDirectModifyState *dmstate;
2260  RangeTblEntry *rte;
2261  Oid userid;
2262  ForeignTable *table;
2263  UserMapping *user;
2264  int numParams;
2265 
2266  /*
2267  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2268  */
2269  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2270  return;
2271 
2272  /*
2273  * We'll save private state in node->fdw_state.
2274  */
2275  dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2276  node->fdw_state = (void *) dmstate;
2277 
2278  /*
2279  * Identify which user to do the remote access as. This should match what
2280  * ExecCheckRTEPerms() does.
2281  */
2282  rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
2283  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
2284 
2285  /* Get info about foreign table. */
2286  dmstate->rel = node->ss.ss_currentRelation;
2287  table = GetForeignTable(RelationGetRelid(dmstate->rel));
2288  user = GetUserMapping(userid, table->serverid);
2289 
2290  /*
2291  * Get connection to the foreign server. Connection manager will
2292  * establish new connection if necessary.
2293  */
2294  dmstate->conn = GetConnection(user, false);
2295 
2296  /* Initialize state variable */
2297  dmstate->num_tuples = -1; /* -1 means not set yet */
2298 
2299  /* Get private info created by planner functions. */
2300  dmstate->query = strVal(list_nth(fsplan->fdw_private,
2302  dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
2304  dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2306  dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
2308 
2309  /* Create context for per-tuple temp workspace. */
2310  dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2311  "postgres_fdw temporary data",
2313 
2314  /* Prepare for input conversion of RETURNING results. */
2315  if (dmstate->has_returning)
2316  dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
2317 
2318  /*
2319  * Prepare for processing of parameters used in remote query, if any.
2320  */
2321  numParams = list_length(fsplan->fdw_exprs);
2322  dmstate->numParams = numParams;
2323  if (numParams > 0)
2325  fsplan->fdw_exprs,
2326  numParams,
2327  &dmstate->param_flinfo,
2328  &dmstate->param_exprs,
2329  &dmstate->param_values);
2330 }
ScanState ss
Definition: execnodes.h:1632
Index scanrelid
Definition: plannodes.h:306
#define RelationGetDescr(relation)
Definition: rel.h:425
Oid GetUserId(void)
Definition: miscinit.c:283
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:155
List * fdw_exprs
Definition: plannodes.h:556
List * fdw_private
Definition: plannodes.h:557
#define strVal(v)
Definition: value.h:54
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
Relation ss_currentRelation
Definition: execnodes.h:1289
EState * state
Definition: execnodes.h:1049
unsigned int Oid
Definition: postgres_ext.h:31
PlanState ps
Definition: execnodes.h:1288
void * list_nth(const List *list, int n)
Definition: list.c:410
#define rt_fetch(rangetable_index, rangetable)
Definition: parsetree.h:31
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void * palloc0(Size size)
Definition: mcxt.c:920
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:1068
Plan * plan
Definition: execnodes.h:1047
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:97
static int list_length(const List *l)
Definition: pg_list.h:89
static char * user
Definition: pg_regress.c:90
#define intVal(v)
Definition: value.h:52
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:166
Definition: pg_list.h:45
#define EXEC_FLAG_EXPLAIN_ONLY
Definition: executor.h:58
#define RelationGetRelid(relation)
Definition: rel.h:413
static void postgresBeginForeignModify ( ModifyTableState mtstate,
ResultRelInfo resultRelInfo,
List fdw_private,
int  subplan_index,
int  eflags 
)
static

Definition at line 1657 of file postgres_fdw.c.

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate(), Assert, PgFdwModifyState::attinmeta, AttributeNumberIsValid, RangeTblEntry::checkAsUser, CMD_DELETE, CMD_INSERT, CMD_UPDATE, PgFdwModifyState::conn, PgFdwModifyState::ctidAttno, elog, ERROR, EState::es_query_cxt, EState::es_range_table, EXEC_FLAG_EXPLAIN_ONLY, ExecFindJunkAttributeInTlist(), FdwModifyPrivateHasReturning, FdwModifyPrivateRetrievedAttrs, FdwModifyPrivateTargetAttnums, FdwModifyPrivateUpdateSql, fmgr_info(), GetConnection(), GetForeignTable(), getTypeOutputInfo(), GetUserId(), GetUserMapping(), PgFdwModifyState::has_returning, intVal, lfirst_int, list_length(), list_nth(), ModifyTableState::mt_plans, NULL, ModifyTableState::operation, PgFdwModifyState::p_flinfo, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, palloc0(), PlanState::plan, ModifyTableState::ps, PgFdwModifyState::query, PgFdwModifyState::rel, RelationGetDescr, RelationGetRelid, PgFdwModifyState::retrieved_attrs, ResultRelInfo::ri_FdwState, ResultRelInfo::ri_RangeTableIndex, ResultRelInfo::ri_RelationDesc, rt_fetch, ForeignTable::serverid, PlanState::state, strVal, PgFdwModifyState::target_attrs, Plan::targetlist, PgFdwModifyState::temp_cxt, TIDOID, TupleDescGetAttInMetadata(), and user.

Referenced by postgres_fdw_handler().

1662 {
1663  PgFdwModifyState *fmstate;
1664  EState *estate = mtstate->ps.state;
1665  CmdType operation = mtstate->operation;
1666  Relation rel = resultRelInfo->ri_RelationDesc;
1667  RangeTblEntry *rte;
1668  Oid userid;
1669  ForeignTable *table;
1670  UserMapping *user;
1671  AttrNumber n_params;
1672  Oid typefnoid;
1673  bool isvarlena;
1674  ListCell *lc;
1675 
1676  /*
1677  * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1678  * stays NULL.
1679  */
1680  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1681  return;
1682 
1683  /* Begin constructing PgFdwModifyState. */
1684  fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1685  fmstate->rel = rel;
1686 
1687  /*
1688  * Identify which user to do the remote access as. This should match what
1689  * ExecCheckRTEPerms() does.
1690  */
1691  rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1692  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1693 
1694  /* Get info about foreign table. */
1695  table = GetForeignTable(RelationGetRelid(rel));
1696  user = GetUserMapping(userid, table->serverid);
1697 
1698  /* Open connection; report that we'll create a prepared statement. */
1699  fmstate->conn = GetConnection(user, true);
1700  fmstate->p_name = NULL; /* prepared statement not made yet */
1701 
1702  /* Deconstruct fdw_private data. */
1703  fmstate->query = strVal(list_nth(fdw_private,
1705  fmstate->target_attrs = (List *) list_nth(fdw_private,
1707  fmstate->has_returning = intVal(list_nth(fdw_private,
1709  fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1711 
1712  /* Create context for per-tuple temp workspace. */
1713  fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1714  "postgres_fdw temporary data",
1716 
1717  /* Prepare for input conversion of RETURNING results. */
1718  if (fmstate->has_returning)
1720 
1721  /* Prepare for output conversion of parameters used in prepared stmt. */
1722  n_params = list_length(fmstate->target_attrs) + 1;
1723  fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1724  fmstate->p_nums = 0;
1725 
1726  if (operation == CMD_UPDATE || operation == CMD_DELETE)
1727  {
1728  /* Find the ctid resjunk column in the subplan's result */
1729  Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1730 
1732  "ctid");
1733  if (!AttributeNumberIsValid(fmstate->ctidAttno))
1734  elog(ERROR, "could not find junk ctid column");
1735 
1736  /* First transmittable parameter will be ctid */
1737  getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1738  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1739  fmstate->p_nums++;
1740  }
1741 
1742  if (operation == CMD_INSERT || operation == CMD_UPDATE)
1743  {
1744  /* Set up for remaining transmittable parameters */
1745  foreach(lc, fmstate->target_attrs)
1746  {
1747  int attnum = lfirst_int(lc);
1748  Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
1749 
1750  Assert(!attr->attisdropped);
1751 
1752  getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1753  fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1754  fmstate->p_nums++;
1755  }
1756  }
1757 
1758  Assert(fmstate->p_nums <= n_params);
1759 
1760  resultRelInfo->ri_FdwState = fmstate;
1761 }
Definition: fmgr.h:53
Relation ri_RelationDesc
Definition: execnodes.h:335
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2600
AttrNumber ExecFindJunkAttributeInTlist(List *targetlist, const char *attrName)
Definition: execJunk.c:221
#define RelationGetDescr(relation)
Definition: rel.h:425
Oid GetUserId(void)
Definition: miscinit.c:283
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:155
#define strVal(v)
Definition: value.h:54
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
CmdType operation
Definition: execnodes.h:1151
EState * state
Definition: execnodes.h:1049
List * es_range_table
Definition: execnodes.h:372
unsigned int Oid
Definition: postgres_ext.h:31
List * retrieved_attrs
Definition: postgres_fdw.c:178
Index ri_RangeTableIndex
Definition: execnodes.h:334
#define TIDOID
Definition: pg_type.h:332
MemoryContext es_query_cxt
Definition: execnodes.h:397
#define ERROR
Definition: elog.h:43
PlanState ps
Definition: execnodes.h:1150
#define lfirst_int(lc)
Definition: pg_list.h:107
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:159
void * list_nth(const List *list, int n)
Definition: list.c:410
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
#define rt_fetch(rangetable_index, rangetable)
Definition: parsetree.h:31
AttrNumber ctidAttno
Definition: postgres_fdw.c:181
PlanState ** mt_plans
Definition: execnodes.h:1154
#define AttributeNumberIsValid(attributeNumber)
Definition: attnum.h:34
FmgrInfo * p_flinfo
Definition: postgres_fdw.c:183
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void * palloc0(Size size)
Definition: mcxt.c:920
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:1068
Plan * plan
Definition: execnodes.h:1047
void * ri_FdwState
Definition: execnodes.h:344
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:97
MemoryContext temp_cxt
Definition: postgres_fdw.c:186
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
Oid serverid
Definition: foreign.h:67
static int list_length(const List *l)
Definition: pg_list.h:89
List * targetlist
Definition: plannodes.h:129
static char * user
Definition: pg_regress.c:90
#define intVal(v)
Definition: value.h:52
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:166
AttInMetadata * attinmeta
Definition: postgres_fdw.c:168
#define elog
Definition: elog.h:219
Definition: pg_list.h:45
#define EXEC_FLAG_EXPLAIN_ONLY
Definition: executor.h:58
int16 AttrNumber
Definition: attnum.h:21
#define RelationGetRelid(relation)
Definition: rel.h:413
CmdType
Definition: nodes.h:641
static void postgresBeginForeignScan ( ForeignScanState node,
int  eflags 
)
static

Definition at line 1277 of file postgres_fdw.c.

References ALLOCSET_DEFAULT_SIZES, ALLOCSET_SMALL_SIZES, AllocSetContextCreate(), bms_next_member(), EXEC_FLAG_EXPLAIN_ONLY, ForeignScan::fdw_exprs, ForeignScan::fdw_private, ForeignScanState::fdw_state, FdwScanPrivateFetchSize, FdwScanPrivateRetrievedAttrs, FdwScanPrivateSelectSql, ForeignScan::fs_relids, GetConnection(), GetCursorNumber(), GetForeignTable(), GetUserId(), GetUserMapping(), intVal, list_length(), list_nth(), NULL, palloc0(), PlanState::plan, prepare_query_params(), ScanState::ps, RelationGetDescr, rt_fetch, ForeignScan::scan, Scan::scanrelid, ForeignScanState::ss, ScanState::ss_currentRelation, ScanState::ss_ScanTupleSlot, PlanState::state, strVal, TupleTableSlot::tts_tupleDescriptor, TupleDescGetAttInMetadata(), and user.

Referenced by postgres_fdw_handler().

1278 {
1279  ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1280  EState *estate = node->ss.ps.state;
1281  PgFdwScanState *fsstate;
1282  RangeTblEntry *rte;
1283  Oid userid;
1284  ForeignTable *table;
1285  UserMapping *user;
1286  int rtindex;
1287  int numParams;
1288 
1289  /*
1290  * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1291  */
1292  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1293  return;
1294 
1295  /*
1296  * We'll save private state in node->fdw_state.
1297  */
1298  fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1299  node->fdw_state = (void *) fsstate;
1300 
1301  /*
1302  * Identify which user to do the remote access as. This should match what
1303  * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
1304  * lowest-numbered member RTE as a representative; we would get the same
1305  * result from any.
1306  */
1307  if (fsplan->scan.scanrelid > 0)
1308  rtindex = fsplan->scan.scanrelid;
1309  else
1310  rtindex = bms_next_member(fsplan->fs_relids, -1);
1311  rte = rt_fetch(rtindex, estate->es_range_table);
1312  userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1313 
1314  /* Get info about foreign table. */
1315  table = GetForeignTable(rte->relid);
1316  user = GetUserMapping(userid, table->serverid);
1317 
1318  /*
1319  * Get connection to the foreign server. Connection manager will
1320  * establish new connection if necessary.
1321  */
1322  fsstate->conn = GetConnection(user, false);
1323 
1324  /* Assign a unique ID for my cursor */
1325  fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1326  fsstate->cursor_exists = false;
1327 
1328  /* Get private info created by planner functions. */
1329  fsstate->query = strVal(list_nth(fsplan->fdw_private,
1331  fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1333  fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1335 
1336  /* Create contexts for batches of tuples and per-tuple temp workspace. */
1337  fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1338  "postgres_fdw tuple data",
1340  fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1341  "postgres_fdw temporary data",
1343 
1344  /*
1345  * Get info we'll need for converting data fetched from the foreign server
1346  * into local representation and error reporting during that process.
1347  */
1348  if (fsplan->scan.scanrelid > 0)
1349  {
1350  fsstate->rel = node->ss.ss_currentRelation;
1351  fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1352  }
1353  else
1354  {
1355  fsstate->rel = NULL;
1356  fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1357  }
1358 
1359  fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1360 
1361  /*
1362  * Prepare for processing of parameters used in remote query, if any.
1363  */
1364  numParams = list_length(fsplan->fdw_exprs);
1365  fsstate->numParams = numParams;
1366  if (numParams > 0)
1368  fsplan->fdw_exprs,
1369  numParams,
1370  &fsstate->param_flinfo,
1371  &fsstate->param_exprs,
1372  &fsstate->param_values);
1373 }
ScanState ss
Definition: execnodes.h:1632
Index scanrelid
Definition: plannodes.h:306
#define RelationGetDescr(relation)
Definition: rel.h:425
Oid GetUserId(void)
Definition: miscinit.c:283
static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, FmgrInfo **param_flinfo, List **param_exprs, const char ***param_values)
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:155
List * fdw_exprs
Definition: plannodes.h:556
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:907
List * fdw_private
Definition: plannodes.h:557
#define strVal(v)
Definition: value.h:54
ForeignTable * GetForeignTable(Oid relid)
Definition: foreign.c:216
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1291
Relation ss_currentRelation
Definition: execnodes.h:1289
EState * state
Definition: execnodes.h:1049
unsigned int Oid
Definition: postgres_ext.h:31
PlanState ps
Definition: execnodes.h:1288
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
void * list_nth(const List *list, int n)
Definition: list.c:410
#define rt_fetch(rangetable_index, rangetable)
Definition: parsetree.h:31
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void * palloc0(Size size)
Definition: mcxt.c:920
AttInMetadata * TupleDescGetAttInMetadata(TupleDesc tupdesc)
Definition: execTuples.c:1068
Plan * plan
Definition: execnodes.h:1047
PGconn * GetConnection(UserMapping *user, bool will_prep_stmt)
Definition: connection.c:97
#define NULL
Definition: c.h:226
unsigned int GetCursorNumber(PGconn *conn)
Definition: connection.c:433
static int list_length(const List *l)
Definition: pg_list.h:89
static char * user
Definition: pg_regress.c:90
#define intVal(v)
Definition: value.h:52
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition: foreign.c:166
Definition: pg_list.h:45
#define EXEC_FLAG_EXPLAIN_ONLY
Definition: executor.h:58
Bitmapset * fs_relids
Definition: plannodes.h:561
static void postgresEndDirectModify ( ForeignScanState node)
static

Definition at line 2381 of file postgres_fdw.c.

References PgFdwDirectModifyState::conn, ForeignScanState::fdw_state, NULL, PQclear(), ReleaseConnection(), and PgFdwDirectModifyState::result.

Referenced by postgres_fdw_handler().

2382 {
2384 
2385  /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2386  if (dmstate == NULL)
2387  return;
2388 
2389  /* Release PGresult */
2390  if (dmstate->result)
2391  PQclear(dmstate->result);
2392 
2393  /* Release remote connection */
2394  ReleaseConnection(dmstate->conn);
2395  dmstate->conn = NULL;
2396 
2397  /* MemoryContext will be deleted automatically. */
2398 }
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define NULL
Definition: c.h:226
static void postgresEndForeignModify ( EState estate,
ResultRelInfo resultRelInfo 
)
static

Definition at line 1984 of file postgres_fdw.c.

References PgFdwModifyState::conn, ERROR, NULL, PgFdwModifyState::p_name, pgfdw_exec_query(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQresultStatus(), ReleaseConnection(), ResultRelInfo::ri_FdwState, and snprintf().

Referenced by postgres_fdw_handler().

1986 {
1987  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1988 
1989  /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
1990  if (fmstate == NULL)
1991  return;
1992 
1993  /* If we created a prepared statement, destroy it */
1994  if (fmstate->p_name)
1995  {
1996  char sql[64];
1997  PGresult *res;
1998 
1999  snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2000 
2001  /*
2002  * We don't use a PG_TRY block here, so be careful not to throw error
2003  * without releasing the PGresult.
2004  */
2005  res = pgfdw_exec_query(fmstate->conn, sql);
2006  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2007  pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2008  PQclear(res);
2009  fmstate->p_name = NULL;
2010  }
2011 
2012  /* Release remote connection */
2013  ReleaseConnection(fmstate->conn);
2014  fmstate->conn = NULL;
2015 }
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
void * ri_FdwState
Definition: execnodes.h:344
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define NULL
Definition: c.h:226
PGresult * pgfdw_exec_query(PGconn *conn, const char *query)
Definition: connection.c:460
static void postgresEndForeignScan ( ForeignScanState node)
static

Definition at line 1478 of file postgres_fdw.c.

References close_cursor(), PgFdwScanState::conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, ForeignScanState::fdw_state, NULL, and ReleaseConnection().

Referenced by postgres_fdw_handler().

1479 {
1480  PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1481 
1482  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1483  if (fsstate == NULL)
1484  return;
1485 
1486  /* Close the cursor if open, to prevent accumulation of cursors */
1487  if (fsstate->cursor_exists)
1488  close_cursor(fsstate->conn, fsstate->cursor_number);
1489 
1490  /* Release remote connection */
1491  ReleaseConnection(fsstate->conn);
1492  fsstate->conn = NULL;
1493 
1494  /* MemoryContexts will be deleted automatically. */
1495 }
static void close_cursor(PGconn *conn, unsigned int cursor_number)
unsigned int cursor_number
Definition: postgres_fdw.c:139
void ReleaseConnection(PGconn *conn)
Definition: connection.c:412
#define NULL
Definition: c.h:226
static TupleTableSlot * postgresExecForeignDelete ( EState estate,
ResultRelInfo resultRelInfo,
TupleTableSlot slot,
TupleTableSlot planSlot 
)
static

Definition at line 1908 of file postgres_fdw.c.

References PgFdwModifyState::conn, convert_prep_stmt_params(), PgFdwModifyState::ctidAttno, DatumGetPointer, elog, ERROR, ExecGetJunkAttribute(), PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQntuples(), PQresultStatus(), PQsendQueryPrepared(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.

Referenced by postgres_fdw_handler().

1912 {
1913  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1914  Datum datum;
1915  bool isNull;
1916  const char **p_values;
1917  PGresult *res;
1918  int n_rows;
1919 
1920  /* Set up the prepared statement on the remote server, if we didn't yet */
1921  if (!fmstate->p_name)
1922  prepare_foreign_modify(fmstate);
1923 
1924  /* Get the ctid that was passed up as a resjunk column */
1925  datum = ExecGetJunkAttribute(planSlot,
1926  fmstate->ctidAttno,
1927  &isNull);
1928  /* shouldn't ever get a null result... */
1929  if (isNull)
1930  elog(ERROR, "ctid is NULL");
1931 
1932  /* Convert parameters needed by prepared statement to text form */
1933  p_values = convert_prep_stmt_params(fmstate,
1934  (ItemPointer) DatumGetPointer(datum),
1935  NULL);
1936 
1937  /*
1938  * Execute the prepared statement.
1939  */
1940  if (!PQsendQueryPrepared(fmstate->conn,
1941  fmstate->p_name,
1942  fmstate->p_nums,
1943  p_values,
1944  NULL,
1945  NULL,
1946  0))
1947  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1948 
1949  /*
1950  * Get the result, and check for success.
1951  *
1952  * We don't use a PG_TRY block here, so be careful not to throw error
1953  * without releasing the PGresult.
1954  */
1955  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1956  if (PQresultStatus(res) !=
1958  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1959 
1960  /* Check number of rows affected, and fetch RETURNING tuple if any */
1961  if (fmstate->has_returning)
1962  {
1963  n_rows = PQntuples(res);
1964  if (n_rows > 0)
1965  store_returning_result(fmstate, slot, res);
1966  }
1967  else
1968  n_rows = atoi(PQcmdTuples(res));
1969 
1970  /* And clean up */
1971  PQclear(res);
1972 
1973  MemoryContextReset(fmstate->temp_cxt);
1974 
1975  /* Return NULL if nothing was deleted on the remote end */
1976  return (n_rows > 0) ? slot : NULL;
1977 }
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3014
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
AttrNumber ctidAttno
Definition: postgres_fdw.c:181
uintptr_t Datum
Definition: postgres.h:374
void * ri_FdwState
Definition: execnodes.h:344
void PQclear(PGresult *res)
Definition: fe-exec.c:650
MemoryContext temp_cxt
Definition: postgres_fdw.c:186
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:484
#define NULL
Definition: c.h:226
static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1325
#define DatumGetPointer(X)
Definition: postgres.h:557
static void prepare_foreign_modify(PgFdwModifyState *fmstate)
Datum ExecGetJunkAttribute(TupleTableSlot *slot, AttrNumber attno, bool *isNull)
Definition: execJunk.c:248
#define elog
Definition: elog.h:219
static TupleTableSlot * postgresExecForeignInsert ( EState estate,
ResultRelInfo resultRelInfo,
TupleTableSlot slot,
TupleTableSlot planSlot 
)
static

Definition at line 1768 of file postgres_fdw.c.

References PgFdwModifyState::conn, convert_prep_stmt_params(), ERROR, PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQntuples(), PQresultStatus(), PQsendQueryPrepared(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.

Referenced by postgres_fdw_handler().

1772 {
1773  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1774  const char **p_values;
1775  PGresult *res;
1776  int n_rows;
1777 
1778  /* Set up the prepared statement on the remote server, if we didn't yet */
1779  if (!fmstate->p_name)
1780  prepare_foreign_modify(fmstate);
1781 
1782  /* Convert parameters needed by prepared statement to text form */
1783  p_values = convert_prep_stmt_params(fmstate, NULL, slot);
1784 
1785  /*
1786  * Execute the prepared statement.
1787  */
1788  if (!PQsendQueryPrepared(fmstate->conn,
1789  fmstate->p_name,
1790  fmstate->p_nums,
1791  p_values,
1792  NULL,
1793  NULL,
1794  0))
1795  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1796 
1797  /*
1798  * Get the result, and check for success.
1799  *
1800  * We don't use a PG_TRY block here, so be careful not to throw error
1801  * without releasing the PGresult.
1802  */
1803  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1804  if (PQresultStatus(res) !=
1806  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1807 
1808  /* Check number of rows affected, and fetch RETURNING tuple if any */
1809  if (fmstate->has_returning)
1810  {
1811  n_rows = PQntuples(res);
1812  if (n_rows > 0)
1813  store_returning_result(fmstate, slot, res);
1814  }
1815  else
1816  n_rows = atoi(PQcmdTuples(res));
1817 
1818  /* And clean up */
1819  PQclear(res);
1820 
1821  MemoryContextReset(fmstate->temp_cxt);
1822 
1823  /* Return NULL if nothing was inserted on the remote end */
1824  return (n_rows > 0) ? slot : NULL;
1825 }
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3014
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
void * ri_FdwState
Definition: execnodes.h:344
void PQclear(PGresult *res)
Definition: fe-exec.c:650
MemoryContext temp_cxt
Definition: postgres_fdw.c:186
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:484
#define NULL
Definition: c.h:226
static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1325
static void prepare_foreign_modify(PgFdwModifyState *fmstate)
static TupleTableSlot * postgresExecForeignUpdate ( EState estate,
ResultRelInfo resultRelInfo,
TupleTableSlot slot,
TupleTableSlot planSlot 
)
static

Definition at line 1832 of file postgres_fdw.c.

References PgFdwModifyState::conn, convert_prep_stmt_params(), PgFdwModifyState::ctidAttno, DatumGetPointer, elog, ERROR, ExecGetJunkAttribute(), PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_get_result(), pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQntuples(), PQresultStatus(), PQsendQueryPrepared(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.

Referenced by postgres_fdw_handler().

1836 {
1837  PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1838  Datum datum;
1839  bool isNull;
1840  const char **p_values;
1841  PGresult *res;
1842  int n_rows;
1843 
1844  /* Set up the prepared statement on the remote server, if we didn't yet */
1845  if (!fmstate->p_name)
1846  prepare_foreign_modify(fmstate);
1847 
1848  /* Get the ctid that was passed up as a resjunk column */
1849  datum = ExecGetJunkAttribute(planSlot,
1850  fmstate->ctidAttno,
1851  &isNull);
1852  /* shouldn't ever get a null result... */
1853  if (isNull)
1854  elog(ERROR, "ctid is NULL");
1855 
1856  /* Convert parameters needed by prepared statement to text form */
1857  p_values = convert_prep_stmt_params(fmstate,
1858  (ItemPointer) DatumGetPointer(datum),
1859  slot);
1860 
1861  /*
1862  * Execute the prepared statement.
1863  */
1864  if (!PQsendQueryPrepared(fmstate->conn,
1865  fmstate->p_name,
1866  fmstate->p_nums,
1867  p_values,
1868  NULL,
1869  NULL,
1870  0))
1871  pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
1872 
1873  /*
1874  * Get the result, and check for success.
1875  *
1876  * We don't use a PG_TRY block here, so be careful not to throw error
1877  * without releasing the PGresult.
1878  */
1879  res = pgfdw_get_result(fmstate->conn, fmstate->query);
1880  if (PQresultStatus(res) !=
1882  pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
1883 
1884  /* Check number of rows affected, and fetch RETURNING tuple if any */
1885  if (fmstate->has_returning)
1886  {
1887  n_rows = PQntuples(res);
1888  if (n_rows > 0)
1889  store_returning_result(fmstate, slot, res);
1890  }
1891  else
1892  n_rows = atoi(PQcmdTuples(res));
1893 
1894  /* And clean up */
1895  PQclear(res);
1896 
1897  MemoryContextReset(fmstate->temp_cxt);
1898 
1899  /* Return NULL if nothing was updated on the remote end */
1900  return (n_rows > 0) ? slot : NULL;
1901 }
char * PQcmdTuples(PGresult *res)
Definition: fe-exec.c:3014
static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
#define ERROR
Definition: elog.h:43
void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql)
Definition: connection.c:538
AttrNumber ctidAttno
Definition: postgres_fdw.c:181
uintptr_t Datum
Definition: postgres.h:374
void * ri_FdwState
Definition: execnodes.h:344
void PQclear(PGresult *res)
Definition: fe-exec.c:650
MemoryContext temp_cxt
Definition: postgres_fdw.c:186
PGresult * pgfdw_get_result(PGconn *conn, const char *query)
Definition: connection.c:484
#define NULL
Definition: c.h:226
static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1325
#define DatumGetPointer(X)
Definition: postgres.h:557
static void prepare_foreign_modify(PgFdwModifyState *fmstate)
Datum ExecGetJunkAttribute(TupleTableSlot *slot, AttrNumber attno, bool *isNull)
Definition: execJunk.c:248
#define elog
Definition: elog.h:219
static void postgresExplainDirectModify ( ForeignScanState node,
ExplainState es 
)
static

Definition at line 2459 of file postgres_fdw.c.

References ExplainPropertyText(), FdwDirectModifyPrivateUpdateSql, list_nth(), PlanState::plan, ScanState::ps, ForeignScanState::ss, strVal, and ExplainState::verbose.

Referenced by postgres_fdw_handler().

2460 {
2461  List *fdw_private;
2462  char *sql;
2463 
2464  if (es->verbose)
2465  {
2466  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2467  sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2468  ExplainPropertyText("Remote SQL", sql, es);
2469  }
2470 }
ScanState ss
Definition: execnodes.h:1632
#define strVal(v)
Definition: value.h:54
PlanState ps
Definition: execnodes.h:1288
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
Definition: explain.c:3046
void * list_nth(const List *list, int n)
Definition: list.c:410
bool verbose
Definition: explain.h:32
Plan * plan
Definition: execnodes.h:1047
Definition: pg_list.h:45
static void postgresExplainForeignModify ( ModifyTableState mtstate,
ResultRelInfo rinfo,
List fdw_private,
int  subplan_index,
ExplainState es 
)
static

Definition at line 2438 of file postgres_fdw.c.

References ExplainPropertyText(), FdwModifyPrivateUpdateSql, list_nth(), strVal, and ExplainState::verbose.

Referenced by postgres_fdw_handler().

2443 {
2444  if (es->verbose)
2445  {
2446  char *sql = strVal(list_nth(fdw_private,
2448 
2449  ExplainPropertyText("Remote SQL", sql, es);
2450  }
2451 }
#define strVal(v)
Definition: value.h:54
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
Definition: explain.c:3046
void * list_nth(const List *list, int n)
Definition: list.c:410
bool verbose
Definition: explain.h:32
static void postgresExplainForeignScan ( ForeignScanState node,
ExplainState es 
)
static

Definition at line 2405 of file postgres_fdw.c.

References ExplainPropertyText(), FdwScanPrivateRelations, FdwScanPrivateSelectSql, list_length(), list_nth(), PlanState::plan, ScanState::ps, ForeignScanState::ss, strVal, and ExplainState::verbose.

Referenced by postgres_fdw_handler().

2406 {
2407  List *fdw_private;
2408  char *sql;
2409  char *relations;
2410 
2411  fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2412 
2413  /*
2414  * Add names of relation handled by the foreign scan when the scan is a
2415  * join
2416  */
2417  if (list_length(fdw_private) > FdwScanPrivateRelations)
2418  {
2419  relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2420  ExplainPropertyText("Relations", relations, es);
2421  }
2422 
2423  /*
2424  * Add remote query, when VERBOSE option is specified.
2425  */
2426  if (es->verbose)
2427  {
2428  sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2429  ExplainPropertyText("Remote SQL", sql, es);
2430  }
2431 }
ScanState ss
Definition: execnodes.h:1632
#define strVal(v)
Definition: value.h:54
PlanState ps
Definition: execnodes.h:1288
void ExplainPropertyText(const char *qlabel, const char *value, ExplainState *es)
Definition: explain.c:3046
void * list_nth(const List *list, int n)
Definition: list.c:410
bool verbose
Definition: explain.h:32
Plan * plan
Definition: execnodes.h:1047
static int list_length(const List *l)
Definition: pg_list.h:89
Definition: pg_list.h:45
static void postgresGetForeignJoinPaths ( PlannerInfo root,
RelOptInfo joinrel,
RelOptInfo outerrel,
RelOptInfo innerrel,
JoinType  jointype,
JoinPathExtraData extra 
)
static