PostgreSQL Source Code  git master
nodeMergeAppend.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  * routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  * ExecInitMergeAppend - initialize the MergeAppend node
17  * ExecMergeAppend - retrieve the next tuple from the node
18  * ExecEndMergeAppend - shut down the MergeAppend node
19  * ExecReScanMergeAppend - rescan the MergeAppend node
20  *
21  * NOTES
22  * A MergeAppend node contains a list of one or more subplans.
23  * These are each expected to deliver tuples that are sorted according
24  * to a common sort key. The MergeAppend node merges these streams
25  * to produce output sorted the same way.
26  *
27  * MergeAppend nodes don't make use of their left and right
28  * subtrees, rather they maintain a list of subplans so
29  * a typical MergeAppend node looks like this in the plan tree:
30  *
31  * ...
32  * /
33  * MergeAppend---+------+------+--- nil
34  * / \ | | |
35  * nil nil ... ... ...
36  * subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/execdebug.h"
42 #include "executor/execPartition.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
46 
47 /*
48  * We have one slot for each item in the heap array. We use SlotNumber
49  * to store slot indexes. This doesn't actually provide any formal
50  * type-safety, but it makes the code more self-documenting.
51  */
52 typedef int32 SlotNumber;
53 
55 static int heap_compare_slots(Datum a, Datum b, void *arg);
56 
57 
58 /* ----------------------------------------------------------------
59  * ExecInitMergeAppend
60  *
61  * Begin all of the subscans of the MergeAppend node.
62  * ----------------------------------------------------------------
63  */
65 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 {
68  PlanState **mergeplanstates;
69  Bitmapset *validsubplans;
70  int nplans;
71  int i,
72  j;
73 
74  /* check for unsupported flags */
75  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 
77  /*
78  * create new MergeAppendState for our node
79  */
80  mergestate->ps.plan = (Plan *) node;
81  mergestate->ps.state = estate;
82  mergestate->ps.ExecProcNode = ExecMergeAppend;
83  mergestate->ms_noopscan = false;
84 
85  /* If run-time partition pruning is enabled, then set that up now */
86  if (node->part_prune_info != NULL)
87  {
88  PartitionPruneState *prunestate;
89 
90  /* We may need an expression context to evaluate partition exprs */
91  ExecAssignExprContext(estate, &mergestate->ps);
92 
93  prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
94  node->part_prune_info);
95  mergestate->ms_prune_state = prunestate;
96 
97  /* Perform an initial partition prune, if required. */
98  if (prunestate->do_initial_prune)
99  {
100  /* Determine which subplans survive initial pruning */
101  validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
102  list_length(node->mergeplans));
103 
104  /*
105  * The case where no subplans survive pruning must be handled
106  * specially. The problem here is that code in explain.c requires
107  * a MergeAppend to have at least one subplan in order for it to
108  * properly determine the Vars in that subplan's targetlist. We
109  * sidestep this issue by just initializing the first subplan and
110  * setting ms_noopscan to true to indicate that we don't really
111  * need to scan any subnodes.
112  */
113  if (bms_is_empty(validsubplans))
114  {
115  mergestate->ms_noopscan = true;
116 
117  /* Mark the first as valid so that it's initialized below */
118  validsubplans = bms_make_singleton(0);
119  }
120 
121  nplans = bms_num_members(validsubplans);
122  }
123  else
124  {
125  /* We'll need to initialize all subplans */
126  nplans = list_length(node->mergeplans);
127  Assert(nplans > 0);
128  validsubplans = bms_add_range(NULL, 0, nplans - 1);
129  }
130 
131  /*
132  * If no runtime pruning is required, we can fill ms_valid_subplans
133  * immediately, preventing later calls to ExecFindMatchingSubPlans.
134  */
135  if (!prunestate->do_exec_prune)
136  {
137  Assert(nplans > 0);
138  mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
139  }
140  }
141  else
142  {
143  nplans = list_length(node->mergeplans);
144 
145  /*
146  * When run-time partition pruning is not enabled we can just mark all
147  * subplans as valid; they must also all be initialized.
148  */
149  Assert(nplans > 0);
150  mergestate->ms_valid_subplans = validsubplans =
151  bms_add_range(NULL, 0, nplans - 1);
152  mergestate->ms_prune_state = NULL;
153  }
154 
155  mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
156  mergestate->mergeplans = mergeplanstates;
157  mergestate->ms_nplans = nplans;
158 
159  mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
160  mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
161  mergestate);
162 
163  /*
164  * Miscellaneous initialization
165  *
166  * MergeAppend nodes do have Result slots, which hold pointers to tuples,
167  * so we have to initialize them. FIXME
168  */
170 
171  /* node returns slots from each of its subnodes, therefore not fixed */
172  mergestate->ps.resultopsset = true;
173  mergestate->ps.resultopsfixed = false;
174 
175  /*
176  * call ExecInitNode on each of the valid plans to be executed and save
177  * the results into the mergeplanstates array.
178  */
179  j = 0;
180  i = -1;
181  while ((i = bms_next_member(validsubplans, i)) >= 0)
182  {
183  Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
184 
185  mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
186  }
187 
188  mergestate->ps.ps_ProjInfo = NULL;
189 
190  /*
191  * initialize sort-key information
192  */
193  mergestate->ms_nkeys = node->numCols;
194  mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
195 
196  for (i = 0; i < node->numCols; i++)
197  {
198  SortSupport sortKey = mergestate->ms_sortkeys + i;
199 
200  sortKey->ssup_cxt = CurrentMemoryContext;
201  sortKey->ssup_collation = node->collations[i];
202  sortKey->ssup_nulls_first = node->nullsFirst[i];
203  sortKey->ssup_attno = node->sortColIdx[i];
204 
205  /*
206  * It isn't feasible to perform abbreviated key conversion, since
207  * tuples are pulled into mergestate's binary heap as needed. It
208  * would likely be counter-productive to convert tuples into an
209  * abbreviated representation as they're pulled up, so opt out of that
210  * additional optimization entirely.
211  */
212  sortKey->abbreviate = false;
213 
215  }
216 
217  /*
218  * initialize to show we have not run the subplans yet
219  */
220  mergestate->ms_initialized = false;
221 
222  return mergestate;
223 }
224 
225 /* ----------------------------------------------------------------
226  * ExecMergeAppend
227  *
228  * Handles iteration over multiple subplans.
229  * ----------------------------------------------------------------
230  */
231 static TupleTableSlot *
233 {
234  MergeAppendState *node = castNode(MergeAppendState, pstate);
235  TupleTableSlot *result;
236  SlotNumber i;
237 
239 
240  if (!node->ms_initialized)
241  {
242  /* Nothing to do if all subplans were pruned */
243  if (node->ms_noopscan)
244  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
245 
246  /*
247  * If we've yet to determine the valid subplans then do so now. If
248  * run-time pruning is disabled then the valid subplans will always be
249  * set to all subplans.
250  */
251  if (node->ms_valid_subplans == NULL)
252  node->ms_valid_subplans =
254 
255  /*
256  * First time through: pull the first tuple from each valid subplan,
257  * and set up the heap.
258  */
259  i = -1;
260  while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
261  {
262  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
263  if (!TupIsNull(node->ms_slots[i]))
265  }
266  binaryheap_build(node->ms_heap);
267  node->ms_initialized = true;
268  }
269  else
270  {
271  /*
272  * Otherwise, pull the next tuple from whichever subplan we returned
273  * from last time, and reinsert the subplan index into the heap,
274  * because it might now compare differently against the existing
275  * elements of the heap. (We could perhaps simplify the logic a bit
276  * by doing this before returning from the prior call, but it's better
277  * to not pull tuples until necessary.)
278  */
280  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
281  if (!TupIsNull(node->ms_slots[i]))
283  else
284  (void) binaryheap_remove_first(node->ms_heap);
285  }
286 
287  if (binaryheap_empty(node->ms_heap))
288  {
289  /* All the subplans are exhausted, and so is the heap */
290  result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
291  }
292  else
293  {
295  result = node->ms_slots[i];
296  }
297 
298  return result;
299 }
300 
301 /*
302  * Compare the tuples in the two given slots.
303  */
304 static int32
306 {
307  MergeAppendState *node = (MergeAppendState *) arg;
308  SlotNumber slot1 = DatumGetInt32(a);
309  SlotNumber slot2 = DatumGetInt32(b);
310 
311  TupleTableSlot *s1 = node->ms_slots[slot1];
312  TupleTableSlot *s2 = node->ms_slots[slot2];
313  int nkey;
314 
315  Assert(!TupIsNull(s1));
316  Assert(!TupIsNull(s2));
317 
318  for (nkey = 0; nkey < node->ms_nkeys; nkey++)
319  {
320  SortSupport sortKey = node->ms_sortkeys + nkey;
321  AttrNumber attno = sortKey->ssup_attno;
322  Datum datum1,
323  datum2;
324  bool isNull1,
325  isNull2;
326  int compare;
327 
328  datum1 = slot_getattr(s1, attno, &isNull1);
329  datum2 = slot_getattr(s2, attno, &isNull2);
330 
331  compare = ApplySortComparator(datum1, isNull1,
332  datum2, isNull2,
333  sortKey);
334  if (compare != 0)
335  {
336  INVERT_COMPARE_RESULT(compare);
337  return compare;
338  }
339  }
340  return 0;
341 }
342 
343 /* ----------------------------------------------------------------
344  * ExecEndMergeAppend
345  *
346  * Shuts down the subscans of the MergeAppend node.
347  *
348  * Returns nothing of interest.
349  * ----------------------------------------------------------------
350  */
351 void
353 {
354  PlanState **mergeplans;
355  int nplans;
356  int i;
357 
358  /*
359  * get information from the node
360  */
361  mergeplans = node->mergeplans;
362  nplans = node->ms_nplans;
363 
364  /*
365  * shut down each of the subscans
366  */
367  for (i = 0; i < nplans; i++)
368  ExecEndNode(mergeplans[i]);
369 }
370 
371 void
373 {
374  int i;
375 
376  /*
377  * If any PARAM_EXEC Params used in pruning expressions have changed, then
378  * we'd better unset the valid subplans so that they are reselected for
379  * the new parameter values.
380  */
381  if (node->ms_prune_state &&
382  bms_overlap(node->ps.chgParam,
384  {
386  node->ms_valid_subplans = NULL;
387  }
388 
389  for (i = 0; i < node->ms_nplans; i++)
390  {
391  PlanState *subnode = node->mergeplans[i];
392 
393  /*
394  * ExecReScan doesn't know about my subplans, so I have to do
395  * changed-parameter signaling myself.
396  */
397  if (node->ps.chgParam != NULL)
398  UpdateChangedParamSet(subnode, node->ps.chgParam);
399 
400  /*
401  * If chgParam of subnode is not null then plan will be re-scanned by
402  * first ExecProcNode.
403  */
404  if (subnode->chgParam == NULL)
405  ExecReScan(subnode);
406  }
407  binaryheap_reset(node->ms_heap);
408  node->ms_initialized = false;
409 }
bool ssup_nulls_first
Definition: sortsupport.h:75
TupleTableSlot ** ms_slots
Definition: execnodes.h:1203
#define binaryheap_empty(h)
Definition: binaryheap.h:52
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:985
SortSupport ms_sortkeys
Definition: execnodes.h:1202
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
#define DatumGetInt32(X)
Definition: postgres.h:472
MergeAppendState * ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
Oid * collations
Definition: plannodes.h:277
#define castNode(_type_, nodeptr)
Definition: nodes.h:593
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:538
void ExecReScan(PlanState *node)
Definition: execAmi.c:77
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
Definition: sortsupport.c:134
Bitmapset * ms_valid_subplans
Definition: execnodes.h:1208
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
struct PartitionPruneInfo * part_prune_info
Definition: plannodes.h:280
EState * state
Definition: execnodes.h:947
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
signed int int32
Definition: c.h:346
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:983
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:834
char * s1
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
MemoryContext ssup_cxt
Definition: sortsupport.h:66
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:186
Bitmapset * ExecFindInitialMatchingSubPlans(PartitionPruneState *prunestate, int nsubplans)
int32 SlotNumber
PlanState ** mergeplans
Definition: execnodes.h:1199
#define TupIsNull(slot)
Definition: tuptable.h:293
void ExecReScanMergeAppend(MergeAppendState *node)
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
Bitmapset * execparamids
void binaryheap_reset(binaryheap *heap)
Definition: binaryheap.c:57
bool resultopsset
Definition: execnodes.h:1028
Bitmapset * chgParam
Definition: execnodes.h:977
bool bms_is_empty(const Bitmapset *a)
Definition: bitmapset.c:701
void * palloc0(Size size)
Definition: mcxt.c:955
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:951
char * s2
uintptr_t Datum
Definition: postgres.h:367
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:235
AttrNumber ssup_attno
Definition: sortsupport.h:81
Plan * plan
Definition: execnodes.h:945
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:382
PlanState ps
Definition: execnodes.h:1198
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:804
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
struct binaryheap * ms_heap
Definition: execnodes.h:1204
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define makeNode(_type_)
Definition: nodes.h:572
#define Assert(condition)
Definition: c.h:732
void ExecEndMergeAppend(MergeAppendState *node)
#define EXEC_FLAG_MARK
Definition: executor.h:59
AttrNumber * sortColIdx
Definition: plannodes.h:275
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:446
static int list_length(const List *l)
Definition: pg_list.h:169
struct PartitionPruneState * ms_prune_state
Definition: execnodes.h:1207
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1764
bool * nullsFirst
Definition: plannodes.h:278
static int heap_compare_slots(Datum a, Datum b, void *arg)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:494
bool resultopsfixed
Definition: execnodes.h:1024
List * mergeplans
Definition: plannodes.h:272
#define Int32GetDatum(X)
Definition: postgres.h:479
void * palloc(Size size)
Definition: mcxt.c:924
int i
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Oid * sortOperators
Definition: plannodes.h:276
static TupleTableSlot * ExecMergeAppend(PlanState *pstate)
PartitionPruneState * ExecCreatePartitionPruneState(PlanState *planstate, PartitionPruneInfo *partitionpruneinfo)
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:200
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
int16 AttrNumber
Definition: attnum.h:21
#define INVERT_COMPARE_RESULT(var)
Definition: c.h:1044
int32 SlotNumber
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate)