PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeMergeAppend.c File Reference
#include "postgres.h"
#include "executor/execdebug.h"
#include "executor/nodeMergeAppend.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
Include dependency graph for nodeMergeAppend.c:

Go to the source code of this file.

Typedefs

typedef int32 SlotNumber
 

Functions

static TupleTableSlotExecMergeAppend (PlanState *pstate)
 
static int heap_compare_slots (Datum a, Datum b, void *arg)
 
MergeAppendStateExecInitMergeAppend (MergeAppend *node, EState *estate, int eflags)
 
void ExecEndMergeAppend (MergeAppendState *node)
 
void ExecReScanMergeAppend (MergeAppendState *node)
 

Typedef Documentation

Definition at line 51 of file nodeMergeAppend.c.

Function Documentation

void ExecEndMergeAppend ( MergeAppendState node)

Definition at line 278 of file nodeMergeAppend.c.

References ExecEndNode(), i, MergeAppendState::mergeplans, and MergeAppendState::ms_nplans.

Referenced by ExecEndNode().

279 {
280  PlanState **mergeplans;
281  int nplans;
282  int i;
283 
284  /*
285  * get information from the node
286  */
287  mergeplans = node->mergeplans;
288  nplans = node->ms_nplans;
289 
290  /*
291  * shut down each of the subscans
292  */
293  for (i = 0; i < nplans; i++)
294  ExecEndNode(mergeplans[i]);
295 }
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:523
PlanState ** mergeplans
Definition: execnodes.h:1023
int i
MergeAppendState* ExecInitMergeAppend ( MergeAppend node,
EState estate,
int  eflags 
)

Definition at line 64 of file nodeMergeAppend.c.

References SortSupportData::abbreviate, Assert, binaryheap_allocate(), MergeAppend::collations, CurrentMemoryContext, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignResultTypeFromTL(), ExecInitNode(), ExecInitResultTupleSlot(), ExecLockNonLeafAppendTables(), ExecMergeAppend(), PlanState::ExecProcNode, heap_compare_slots(), i, lfirst, list_length(), makeNode, MergeAppend::mergeplans, MergeAppendState::mergeplans, MergeAppendState::ms_heap, MergeAppendState::ms_initialized, MergeAppendState::ms_nkeys, MergeAppendState::ms_nplans, MergeAppendState::ms_slots, MergeAppendState::ms_sortkeys, MergeAppend::nullsFirst, MergeAppend::numCols, palloc0(), MergeAppend::partitioned_rels, PlanState::plan, PrepareSortSupportFromOrderingOp(), MergeAppendState::ps, PlanState::ps_ProjInfo, MergeAppend::sortColIdx, MergeAppend::sortOperators, SortSupportData::ssup_attno, SortSupportData::ssup_collation, SortSupportData::ssup_cxt, SortSupportData::ssup_nulls_first, and PlanState::state.

Referenced by ExecInitNode().

65 {
67  PlanState **mergeplanstates;
68  int nplans;
69  int i;
70  ListCell *lc;
71 
72  /* check for unsupported flags */
73  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
74 
75  /*
76  * Lock the non-leaf tables in the partition tree controlled by this node.
77  * It's a no-op for non-partitioned parent tables.
78  */
80 
81  /*
82  * Set up empty vector of subplan states
83  */
84  nplans = list_length(node->mergeplans);
85 
86  mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
87 
88  /*
89  * create new MergeAppendState for our node
90  */
91  mergestate->ps.plan = (Plan *) node;
92  mergestate->ps.state = estate;
93  mergestate->ps.ExecProcNode = ExecMergeAppend;
94  mergestate->mergeplans = mergeplanstates;
95  mergestate->ms_nplans = nplans;
96 
97  mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
98  mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
99  mergestate);
100 
101  /*
102  * Miscellaneous initialization
103  *
104  * MergeAppend plans don't have expression contexts because they never
105  * call ExecQual or ExecProject.
106  */
107 
108  /*
109  * MergeAppend nodes do have Result slots, which hold pointers to tuples,
110  * so we have to initialize them.
111  */
112  ExecInitResultTupleSlot(estate, &mergestate->ps);
113 
114  /*
115  * call ExecInitNode on each of the plans to be executed and save the
116  * results into the array "mergeplans".
117  */
118  i = 0;
119  foreach(lc, node->mergeplans)
120  {
121  Plan *initNode = (Plan *) lfirst(lc);
122 
123  mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
124  i++;
125  }
126 
127  /*
128  * initialize output tuple type
129  */
130  ExecAssignResultTypeFromTL(&mergestate->ps);
131  mergestate->ps.ps_ProjInfo = NULL;
132 
133  /*
134  * initialize sort-key information
135  */
136  mergestate->ms_nkeys = node->numCols;
137  mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
138 
139  for (i = 0; i < node->numCols; i++)
140  {
141  SortSupport sortKey = mergestate->ms_sortkeys + i;
142 
143  sortKey->ssup_cxt = CurrentMemoryContext;
144  sortKey->ssup_collation = node->collations[i];
145  sortKey->ssup_nulls_first = node->nullsFirst[i];
146  sortKey->ssup_attno = node->sortColIdx[i];
147 
148  /*
149  * It isn't feasible to perform abbreviated key conversion, since
150  * tuples are pulled into mergestate's binary heap as needed. It
151  * would likely be counter-productive to convert tuples into an
152  * abbreviated representation as they're pulled up, so opt out of that
153  * additional optimization entirely.
154  */
155  sortKey->abbreviate = false;
156 
158  }
159 
160  /*
161  * initialize to show we have not run the subplans yet
162  */
163  mergestate->ms_initialized = false;
164 
165  return mergestate;
166 }
bool ssup_nulls_first
Definition: sortsupport.h:75
void ExecLockNonLeafAppendTables(List *partitioned_rels, EState *estate)
Definition: execUtils.c:831
TupleTableSlot ** ms_slots
Definition: execnodes.h:1027
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:882
SortSupport ms_sortkeys
Definition: execnodes.h:1026
Oid * collations
Definition: plannodes.h:268
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
Definition: sortsupport.c:133
EState * state
Definition: execnodes.h:849
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:445
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
MemoryContext ssup_cxt
Definition: sortsupport.h:66
#define EXEC_FLAG_BACKWARD
Definition: executor.h:60
List * partitioned_rels
Definition: plannodes.h:262
PlanState ** mergeplans
Definition: execnodes.h:1023
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
void * palloc0(Size size)
Definition: mcxt.c:877
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:853
AttrNumber ssup_attno
Definition: sortsupport.h:81
Plan * plan
Definition: execnodes.h:847
PlanState ps
Definition: execnodes.h:1022
struct binaryheap * ms_heap
Definition: execnodes.h:1028
#define makeNode(_type_)
Definition: nodes.h:558
#define Assert(condition)
Definition: c.h:681
#define lfirst(lc)
Definition: pg_list.h:106
#define EXEC_FLAG_MARK
Definition: executor.h:61
AttrNumber * sortColIdx
Definition: plannodes.h:266
static int list_length(const List *l)
Definition: pg_list.h:89
bool * nullsFirst
Definition: plannodes.h:269
static int heap_compare_slots(Datum a, Datum b, void *arg)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
List * mergeplans
Definition: plannodes.h:263
int i
Oid * sortOperators
Definition: plannodes.h:267
static TupleTableSlot * ExecMergeAppend(PlanState *pstate)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
static TupleTableSlot * ExecMergeAppend ( PlanState pstate)
static

Definition at line 175 of file nodeMergeAppend.c.

References binaryheap_add_unordered(), binaryheap_build(), binaryheap_empty, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), castNode, CHECK_FOR_INTERRUPTS, DatumGetInt32, ExecClearTuple(), ExecProcNode(), i, Int32GetDatum, MergeAppendState::mergeplans, MergeAppendState::ms_heap, MergeAppendState::ms_initialized, MergeAppendState::ms_nplans, MergeAppendState::ms_slots, MergeAppendState::ps, PlanState::ps_ResultTupleSlot, and TupIsNull.

Referenced by ExecInitMergeAppend().

176 {
177  MergeAppendState *node = castNode(MergeAppendState, pstate);
178  TupleTableSlot *result;
179  SlotNumber i;
180 
182 
183  if (!node->ms_initialized)
184  {
185  /*
186  * First time through: pull the first tuple from each subplan, and set
187  * up the heap.
188  */
189  for (i = 0; i < node->ms_nplans; i++)
190  {
191  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
192  if (!TupIsNull(node->ms_slots[i]))
194  }
195  binaryheap_build(node->ms_heap);
196  node->ms_initialized = true;
197  }
198  else
199  {
200  /*
201  * Otherwise, pull the next tuple from whichever subplan we returned
202  * from last time, and reinsert the subplan index into the heap,
203  * because it might now compare differently against the existing
204  * elements of the heap. (We could perhaps simplify the logic a bit
205  * by doing this before returning from the prior call, but it's better
206  * to not pull tuples until necessary.)
207  */
209  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
210  if (!TupIsNull(node->ms_slots[i]))
212  else
213  (void) binaryheap_remove_first(node->ms_heap);
214  }
215 
216  if (binaryheap_empty(node->ms_heap))
217  {
218  /* All the subplans are exhausted, and so is the heap */
219  result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
220  }
221  else
222  {
224  result = node->ms_slots[i];
225  }
226 
227  return result;
228 }
TupleTableSlot ** ms_slots
Definition: execnodes.h:1027
#define binaryheap_empty(h)
Definition: binaryheap.h:52
#define DatumGetInt32(X)
Definition: postgres.h:478
#define castNode(_type_, nodeptr)
Definition: nodes.h:579
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:880
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
PlanState ** mergeplans
Definition: execnodes.h:1023
int32 SlotNumber
#define TupIsNull(slot)
Definition: tuptable.h:138
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:246
PlanState ps
Definition: execnodes.h:1022
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct binaryheap * ms_heap
Definition: execnodes.h:1028
#define Int32GetDatum(X)
Definition: postgres.h:485
int i
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
void ExecReScanMergeAppend ( MergeAppendState node)

Definition at line 298 of file nodeMergeAppend.c.

References binaryheap_reset(), PlanState::chgParam, ExecReScan(), i, MergeAppendState::mergeplans, MergeAppendState::ms_heap, MergeAppendState::ms_initialized, MergeAppendState::ms_nplans, MergeAppendState::ps, and UpdateChangedParamSet().

Referenced by ExecReScan().

299 {
300  int i;
301 
302  for (i = 0; i < node->ms_nplans; i++)
303  {
304  PlanState *subnode = node->mergeplans[i];
305 
306  /*
307  * ExecReScan doesn't know about my subplans, so I have to do
308  * changed-parameter signaling myself.
309  */
310  if (node->ps.chgParam != NULL)
311  UpdateChangedParamSet(subnode, node->ps.chgParam);
312 
313  /*
314  * If chgParam of subnode is not null then plan will be re-scanned by
315  * first ExecProcNode.
316  */
317  if (subnode->chgParam == NULL)
318  ExecReScan(subnode);
319  }
320  binaryheap_reset(node->ms_heap);
321  node->ms_initialized = false;
322 }
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
PlanState ** mergeplans
Definition: execnodes.h:1023
void binaryheap_reset(binaryheap *heap)
Definition: binaryheap.c:57
Bitmapset * chgParam
Definition: execnodes.h:875
PlanState ps
Definition: execnodes.h:1022
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:676
struct binaryheap * ms_heap
Definition: execnodes.h:1028
int i
static int32 heap_compare_slots ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 234 of file nodeMergeAppend.c.

References ApplySortComparator(), Assert, compare(), DatumGetInt32, MergeAppendState::ms_nkeys, MergeAppendState::ms_slots, MergeAppendState::ms_sortkeys, s1, s2, slot_getattr(), SortSupportData::ssup_attno, and TupIsNull.

Referenced by ExecInitMergeAppend().

235 {
237  SlotNumber slot1 = DatumGetInt32(a);
238  SlotNumber slot2 = DatumGetInt32(b);
239 
240  TupleTableSlot *s1 = node->ms_slots[slot1];
241  TupleTableSlot *s2 = node->ms_slots[slot2];
242  int nkey;
243 
244  Assert(!TupIsNull(s1));
245  Assert(!TupIsNull(s2));
246 
247  for (nkey = 0; nkey < node->ms_nkeys; nkey++)
248  {
249  SortSupport sortKey = node->ms_sortkeys + nkey;
250  AttrNumber attno = sortKey->ssup_attno;
251  Datum datum1,
252  datum2;
253  bool isNull1,
254  isNull2;
255  int compare;
256 
257  datum1 = slot_getattr(s1, attno, &isNull1);
258  datum2 = slot_getattr(s2, attno, &isNull2);
259 
260  compare = ApplySortComparator(datum1, isNull1,
261  datum2, isNull2,
262  sortKey);
263  if (compare != 0)
264  return -compare;
265  }
266  return 0;
267 }
TupleTableSlot ** ms_slots
Definition: execnodes.h:1027
SortSupport ms_sortkeys
Definition: execnodes.h:1026
#define DatumGetInt32(X)
Definition: postgres.h:478
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
char * s1
int32 SlotNumber
#define TupIsNull(slot)
Definition: tuptable.h:138
char * s2
uintptr_t Datum
Definition: postgres.h:372
AttrNumber ssup_attno
Definition: sortsupport.h:81
#define Assert(condition)
Definition: c.h:681
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1142
void * arg
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:201
int16 AttrNumber
Definition: attnum.h:21