PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeMergeAppend.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  * routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  * ExecInitMergeAppend - initialize the MergeAppend node
17  * ExecMergeAppend - retrieve the next tuple from the node
18  * ExecEndMergeAppend - shut down the MergeAppend node
19  * ExecReScanMergeAppend - rescan the MergeAppend node
20  *
21  * NOTES
22  * A MergeAppend node contains a list of one or more subplans.
23  * These are each expected to deliver tuples that are sorted according
24  * to a common sort key. The MergeAppend node merges these streams
25  * to produce output sorted the same way.
26  *
27  * MergeAppend nodes don't make use of their left and right
28  * subtrees, rather they maintain a list of subplans so
29  * a typical MergeAppend node looks like this in the plan tree:
30  *
31  * ...
32  * /
33  * MergeAppend---+------+------+--- nil
34  * / \ | | |
35  * nil nil ... ... ...
36  * subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
43 
44 #include "lib/binaryheap.h"
45 
46 /*
47  * We have one slot for each item in the heap array. We use SlotNumber
48  * to store slot indexes. This doesn't actually provide any formal
49  * type-safety, but it makes the code more self-documenting.
50  */
51 typedef int32 SlotNumber;
52 
53 static int heap_compare_slots(Datum a, Datum b, void *arg);
54 
55 
56 /* ----------------------------------------------------------------
57  * ExecInitMergeAppend
58  *
59  * Begin all of the subscans of the MergeAppend node.
60  * ----------------------------------------------------------------
61  */
63 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
64 {
66  PlanState **mergeplanstates;
67  int nplans;
68  int i;
69  ListCell *lc;
70 
71  /* check for unsupported flags */
72  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
73 
74  /*
75  * Lock the non-leaf tables in the partition tree controlled by this
76  * node. It's a no-op for non-partitioned parent tables.
77  */
79 
80  /*
81  * Set up empty vector of subplan states
82  */
83  nplans = list_length(node->mergeplans);
84 
85  mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
86 
87  /*
88  * create new MergeAppendState for our node
89  */
90  mergestate->ps.plan = (Plan *) node;
91  mergestate->ps.state = estate;
92  mergestate->mergeplans = mergeplanstates;
93  mergestate->ms_nplans = nplans;
94 
95  mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
96  mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
97  mergestate);
98 
99  /*
100  * Miscellaneous initialization
101  *
102  * MergeAppend plans don't have expression contexts because they never
103  * call ExecQual or ExecProject.
104  */
105 
106  /*
107  * MergeAppend nodes do have Result slots, which hold pointers to tuples,
108  * so we have to initialize them.
109  */
110  ExecInitResultTupleSlot(estate, &mergestate->ps);
111 
112  /*
113  * call ExecInitNode on each of the plans to be executed and save the
114  * results into the array "mergeplans".
115  */
116  i = 0;
117  foreach(lc, node->mergeplans)
118  {
119  Plan *initNode = (Plan *) lfirst(lc);
120 
121  mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
122  i++;
123  }
124 
125  /*
126  * initialize output tuple type
127  */
128  ExecAssignResultTypeFromTL(&mergestate->ps);
129  mergestate->ps.ps_ProjInfo = NULL;
130 
131  /*
132  * initialize sort-key information
133  */
134  mergestate->ms_nkeys = node->numCols;
135  mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
136 
137  for (i = 0; i < node->numCols; i++)
138  {
139  SortSupport sortKey = mergestate->ms_sortkeys + i;
140 
141  sortKey->ssup_cxt = CurrentMemoryContext;
142  sortKey->ssup_collation = node->collations[i];
143  sortKey->ssup_nulls_first = node->nullsFirst[i];
144  sortKey->ssup_attno = node->sortColIdx[i];
145 
146  /*
147  * It isn't feasible to perform abbreviated key conversion, since
148  * tuples are pulled into mergestate's binary heap as needed. It
149  * would likely be counter-productive to convert tuples into an
150  * abbreviated representation as they're pulled up, so opt out of that
151  * additional optimization entirely.
152  */
153  sortKey->abbreviate = false;
154 
156  }
157 
158  /*
159  * initialize to show we have not run the subplans yet
160  */
161  mergestate->ms_initialized = false;
162 
163  return mergestate;
164 }
165 
166 /* ----------------------------------------------------------------
167  * ExecMergeAppend
168  *
169  * Handles iteration over multiple subplans.
170  * ----------------------------------------------------------------
171  */
174 {
176  SlotNumber i;
177 
178  if (!node->ms_initialized)
179  {
180  /*
181  * First time through: pull the first tuple from each subplan, and set
182  * up the heap.
183  */
184  for (i = 0; i < node->ms_nplans; i++)
185  {
186  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
187  if (!TupIsNull(node->ms_slots[i]))
189  }
190  binaryheap_build(node->ms_heap);
191  node->ms_initialized = true;
192  }
193  else
194  {
195  /*
196  * Otherwise, pull the next tuple from whichever subplan we returned
197  * from last time, and reinsert the subplan index into the heap,
198  * because it might now compare differently against the existing
199  * elements of the heap. (We could perhaps simplify the logic a bit
200  * by doing this before returning from the prior call, but it's better
201  * to not pull tuples until necessary.)
202  */
204  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
205  if (!TupIsNull(node->ms_slots[i]))
207  else
208  (void) binaryheap_remove_first(node->ms_heap);
209  }
210 
211  if (binaryheap_empty(node->ms_heap))
212  {
213  /* All the subplans are exhausted, and so is the heap */
214  result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
215  }
216  else
217  {
219  result = node->ms_slots[i];
220  }
221 
222  return result;
223 }
224 
225 /*
226  * Compare the tuples in the two given slots.
227  */
228 static int32
230 {
231  MergeAppendState *node = (MergeAppendState *) arg;
232  SlotNumber slot1 = DatumGetInt32(a);
233  SlotNumber slot2 = DatumGetInt32(b);
234 
235  TupleTableSlot *s1 = node->ms_slots[slot1];
236  TupleTableSlot *s2 = node->ms_slots[slot2];
237  int nkey;
238 
239  Assert(!TupIsNull(s1));
240  Assert(!TupIsNull(s2));
241 
242  for (nkey = 0; nkey < node->ms_nkeys; nkey++)
243  {
244  SortSupport sortKey = node->ms_sortkeys + nkey;
245  AttrNumber attno = sortKey->ssup_attno;
246  Datum datum1,
247  datum2;
248  bool isNull1,
249  isNull2;
250  int compare;
251 
252  datum1 = slot_getattr(s1, attno, &isNull1);
253  datum2 = slot_getattr(s2, attno, &isNull2);
254 
255  compare = ApplySortComparator(datum1, isNull1,
256  datum2, isNull2,
257  sortKey);
258  if (compare != 0)
259  return -compare;
260  }
261  return 0;
262 }
263 
264 /* ----------------------------------------------------------------
265  * ExecEndMergeAppend
266  *
267  * Shuts down the subscans of the MergeAppend node.
268  *
269  * Returns nothing of interest.
270  * ----------------------------------------------------------------
271  */
272 void
274 {
275  PlanState **mergeplans;
276  int nplans;
277  int i;
278 
279  /*
280  * get information from the node
281  */
282  mergeplans = node->mergeplans;
283  nplans = node->ms_nplans;
284 
285  /*
286  * shut down each of the subscans
287  */
288  for (i = 0; i < nplans; i++)
289  ExecEndNode(mergeplans[i]);
290 }
291 
292 void
294 {
295  int i;
296 
297  for (i = 0; i < node->ms_nplans; i++)
298  {
299  PlanState *subnode = node->mergeplans[i];
300 
301  /*
302  * ExecReScan doesn't know about my subplans, so I have to do
303  * changed-parameter signaling myself.
304  */
305  if (node->ps.chgParam != NULL)
306  UpdateChangedParamSet(subnode, node->ps.chgParam);
307 
308  /*
309  * If chgParam of subnode is not null then plan will be re-scanned by
310  * first ExecProcNode.
311  */
312  if (subnode->chgParam == NULL)
313  ExecReScan(subnode);
314  }
315  binaryheap_reset(node->ms_heap);
316  node->ms_initialized = false;
317 }
bool ssup_nulls_first
Definition: sortsupport.h:75
void ExecLockNonLeafAppendTables(List *partitioned_rels, EState *estate)
Definition: execUtils.c:826
TupleTableSlot * ExecProcNode(PlanState *node)
Definition: execProcnode.c:398
TupleTableSlot ** ms_slots
Definition: execnodes.h:973
#define binaryheap_empty(h)
Definition: binaryheap.h:52
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:834
SortSupport ms_sortkeys
Definition: execnodes.h:972
#define DatumGetInt32(X)
Definition: postgres.h:478
MergeAppendState * ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
Oid * collations
Definition: plannodes.h:257
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:654
void ExecReScan(PlanState *node)
Definition: execAmi.c:75
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
Definition: sortsupport.c:133
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
return result
Definition: formatting.c:1618
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
EState * state
Definition: execnodes.h:805
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:440
signed int int32
Definition: c.h:256
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:832
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
char * s1
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
MemoryContext ssup_cxt
Definition: sortsupport.h:66
#define EXEC_FLAG_BACKWARD
Definition: executor.h:60
TupleTableSlot * ExecMergeAppend(MergeAppendState *node)
List * partitioned_rels
Definition: plannodes.h:251
int32 SlotNumber
PlanState ** mergeplans
Definition: execnodes.h:969
#define TupIsNull(slot)
Definition: tuptable.h:138
void ExecReScanMergeAppend(MergeAppendState *node)
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
void binaryheap_reset(binaryheap *heap)
Definition: binaryheap.c:57
Bitmapset * chgParam
Definition: execnodes.h:827
void * palloc0(Size size)
Definition: mcxt.c:878
char * s2
uintptr_t Datum
Definition: postgres.h:372
AttrNumber ssup_attno
Definition: sortsupport.h:81
Plan * plan
Definition: execnodes.h:803
PlanState ps
Definition: execnodes.h:968
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:671
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct binaryheap * ms_heap
Definition: execnodes.h:974
#define makeNode(_type_)
Definition: nodes.h:557
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
void ExecEndMergeAppend(MergeAppendState *node)
#define EXEC_FLAG_MARK
Definition: executor.h:61
AttrNumber * sortColIdx
Definition: plannodes.h:255
static int list_length(const List *l)
Definition: pg_list.h:89
bool * nullsFirst
Definition: plannodes.h:258
static int heap_compare_slots(Datum a, Datum b, void *arg)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
List * mergeplans
Definition: plannodes.h:252
#define Int32GetDatum(X)
Definition: postgres.h:485
int i
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1143
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
Oid * sortOperators
Definition: plannodes.h:256
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:201
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:140
int16 AttrNumber
Definition: attnum.h:21
int32 SlotNumber