PostgreSQL Source Code  git master
nodeMergeAppend.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeMergeAppend.c
4  * routines to handle MergeAppend nodes.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/executor/nodeMergeAppend.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /* INTERFACE ROUTINES
16  * ExecInitMergeAppend - initialize the MergeAppend node
17  * ExecMergeAppend - retrieve the next tuple from the node
18  * ExecEndMergeAppend - shut down the MergeAppend node
19  * ExecReScanMergeAppend - rescan the MergeAppend node
20  *
21  * NOTES
22  * A MergeAppend node contains a list of one or more subplans.
23  * These are each expected to deliver tuples that are sorted according
24  * to a common sort key. The MergeAppend node merges these streams
25  * to produce output sorted the same way.
26  *
27  * MergeAppend nodes don't make use of their left and right
28  * subtrees, rather they maintain a list of subplans so
29  * a typical MergeAppend node looks like this in the plan tree:
30  *
31  * ...
32  * /
33  * MergeAppend---+------+------+--- nil
34  * / \ | | |
35  * nil nil ... ... ...
36  * subplans
37  */
38 
39 #include "postgres.h"
40 
41 #include "executor/executor.h"
42 #include "executor/execPartition.h"
44 #include "lib/binaryheap.h"
45 #include "miscadmin.h"
46 
47 /*
48  * We have one slot for each item in the heap array. We use SlotNumber
49  * to store slot indexes. This doesn't actually provide any formal
50  * type-safety, but it makes the code more self-documenting.
51  */
52 typedef int32 SlotNumber;
53 
55 static int heap_compare_slots(Datum a, Datum b, void *arg);
56 
57 
58 /* ----------------------------------------------------------------
59  * ExecInitMergeAppend
60  *
61  * Begin all of the subscans of the MergeAppend node.
62  * ----------------------------------------------------------------
63  */
65 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66 {
68  PlanState **mergeplanstates;
69  Bitmapset *validsubplans;
70  int nplans;
71  int i,
72  j;
73 
74  /* check for unsupported flags */
75  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
76 
77  /*
78  * create new MergeAppendState for our node
79  */
80  mergestate->ps.plan = (Plan *) node;
81  mergestate->ps.state = estate;
82  mergestate->ps.ExecProcNode = ExecMergeAppend;
83 
84  /* If run-time partition pruning is enabled, then set that up now */
85  if (node->part_prune_info != NULL)
86  {
87  PartitionPruneState *prunestate;
88 
89  /*
90  * Set up pruning data structure. This also initializes the set of
91  * subplans to initialize (validsubplans) by taking into account the
92  * result of performing initial pruning if any.
93  */
94  prunestate = ExecInitPartitionPruning(&mergestate->ps,
95  list_length(node->mergeplans),
96  node->part_prune_info,
97  &validsubplans);
98  mergestate->ms_prune_state = prunestate;
99  nplans = bms_num_members(validsubplans);
100 
101  /*
102  * When no run-time pruning is required and there's at least one
103  * subplan, we can fill ms_valid_subplans immediately, preventing
104  * later calls to ExecFindMatchingSubPlans.
105  */
106  if (!prunestate->do_exec_prune && nplans > 0)
107  mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
108  }
109  else
110  {
111  nplans = list_length(node->mergeplans);
112 
113  /*
114  * When run-time partition pruning is not enabled we can just mark all
115  * subplans as valid; they must also all be initialized.
116  */
117  Assert(nplans > 0);
118  mergestate->ms_valid_subplans = validsubplans =
119  bms_add_range(NULL, 0, nplans - 1);
120  mergestate->ms_prune_state = NULL;
121  }
122 
123  mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
124  mergestate->mergeplans = mergeplanstates;
125  mergestate->ms_nplans = nplans;
126 
127  mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
128  mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
129  mergestate);
130 
131  /*
132  * Miscellaneous initialization
133  *
134  * MergeAppend nodes do have Result slots, which hold pointers to tuples,
135  * so we have to initialize them. FIXME
136  */
138 
139  /* node returns slots from each of its subnodes, therefore not fixed */
140  mergestate->ps.resultopsset = true;
141  mergestate->ps.resultopsfixed = false;
142 
143  /*
144  * call ExecInitNode on each of the valid plans to be executed and save
145  * the results into the mergeplanstates array.
146  */
147  j = 0;
148  i = -1;
149  while ((i = bms_next_member(validsubplans, i)) >= 0)
150  {
151  Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
152 
153  mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
154  }
155 
156  mergestate->ps.ps_ProjInfo = NULL;
157 
158  /*
159  * initialize sort-key information
160  */
161  mergestate->ms_nkeys = node->numCols;
162  mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
163 
164  for (i = 0; i < node->numCols; i++)
165  {
166  SortSupport sortKey = mergestate->ms_sortkeys + i;
167 
168  sortKey->ssup_cxt = CurrentMemoryContext;
169  sortKey->ssup_collation = node->collations[i];
170  sortKey->ssup_nulls_first = node->nullsFirst[i];
171  sortKey->ssup_attno = node->sortColIdx[i];
172 
173  /*
174  * It isn't feasible to perform abbreviated key conversion, since
175  * tuples are pulled into mergestate's binary heap as needed. It
176  * would likely be counter-productive to convert tuples into an
177  * abbreviated representation as they're pulled up, so opt out of that
178  * additional optimization entirely.
179  */
180  sortKey->abbreviate = false;
181 
182  PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
183  }
184 
185  /*
186  * initialize to show we have not run the subplans yet
187  */
188  mergestate->ms_initialized = false;
189 
190  return mergestate;
191 }
192 
193 /* ----------------------------------------------------------------
194  * ExecMergeAppend
195  *
196  * Handles iteration over multiple subplans.
197  * ----------------------------------------------------------------
198  */
199 static TupleTableSlot *
201 {
202  MergeAppendState *node = castNode(MergeAppendState, pstate);
203  TupleTableSlot *result;
204  SlotNumber i;
205 
207 
208  if (!node->ms_initialized)
209  {
210  /* Nothing to do if all subplans were pruned */
211  if (node->ms_nplans == 0)
212  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
213 
214  /*
215  * If we've yet to determine the valid subplans then do so now. If
216  * run-time pruning is disabled then the valid subplans will always be
217  * set to all subplans.
218  */
219  if (node->ms_valid_subplans == NULL)
220  node->ms_valid_subplans =
222 
223  /*
224  * First time through: pull the first tuple from each valid subplan,
225  * and set up the heap.
226  */
227  i = -1;
228  while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
229  {
230  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
231  if (!TupIsNull(node->ms_slots[i]))
233  }
234  binaryheap_build(node->ms_heap);
235  node->ms_initialized = true;
236  }
237  else
238  {
239  /*
240  * Otherwise, pull the next tuple from whichever subplan we returned
241  * from last time, and reinsert the subplan index into the heap,
242  * because it might now compare differently against the existing
243  * elements of the heap. (We could perhaps simplify the logic a bit
244  * by doing this before returning from the prior call, but it's better
245  * to not pull tuples until necessary.)
246  */
248  node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
249  if (!TupIsNull(node->ms_slots[i]))
251  else
252  (void) binaryheap_remove_first(node->ms_heap);
253  }
254 
255  if (binaryheap_empty(node->ms_heap))
256  {
257  /* All the subplans are exhausted, and so is the heap */
258  result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
259  }
260  else
261  {
263  result = node->ms_slots[i];
264  }
265 
266  return result;
267 }
268 
269 /*
270  * Compare the tuples in the two given slots.
271  */
272 static int32
274 {
276  SlotNumber slot1 = DatumGetInt32(a);
277  SlotNumber slot2 = DatumGetInt32(b);
278 
279  TupleTableSlot *s1 = node->ms_slots[slot1];
280  TupleTableSlot *s2 = node->ms_slots[slot2];
281  int nkey;
282 
283  Assert(!TupIsNull(s1));
284  Assert(!TupIsNull(s2));
285 
286  for (nkey = 0; nkey < node->ms_nkeys; nkey++)
287  {
288  SortSupport sortKey = node->ms_sortkeys + nkey;
289  AttrNumber attno = sortKey->ssup_attno;
290  Datum datum1,
291  datum2;
292  bool isNull1,
293  isNull2;
294  int compare;
295 
296  datum1 = slot_getattr(s1, attno, &isNull1);
297  datum2 = slot_getattr(s2, attno, &isNull2);
298 
299  compare = ApplySortComparator(datum1, isNull1,
300  datum2, isNull2,
301  sortKey);
302  if (compare != 0)
303  {
305  return compare;
306  }
307  }
308  return 0;
309 }
310 
311 /* ----------------------------------------------------------------
312  * ExecEndMergeAppend
313  *
314  * Shuts down the subscans of the MergeAppend node.
315  *
316  * Returns nothing of interest.
317  * ----------------------------------------------------------------
318  */
319 void
321 {
322  PlanState **mergeplans;
323  int nplans;
324  int i;
325 
326  /*
327  * get information from the node
328  */
329  mergeplans = node->mergeplans;
330  nplans = node->ms_nplans;
331 
332  /*
333  * shut down each of the subscans
334  */
335  for (i = 0; i < nplans; i++)
336  ExecEndNode(mergeplans[i]);
337 }
338 
339 void
341 {
342  int i;
343 
344  /*
345  * If any PARAM_EXEC Params used in pruning expressions have changed, then
346  * we'd better unset the valid subplans so that they are reselected for
347  * the new parameter values.
348  */
349  if (node->ms_prune_state &&
350  bms_overlap(node->ps.chgParam,
352  {
354  node->ms_valid_subplans = NULL;
355  }
356 
357  for (i = 0; i < node->ms_nplans; i++)
358  {
359  PlanState *subnode = node->mergeplans[i];
360 
361  /*
362  * ExecReScan doesn't know about my subplans, so I have to do
363  * changed-parameter signaling myself.
364  */
365  if (node->ps.chgParam != NULL)
366  UpdateChangedParamSet(subnode, node->ps.chgParam);
367 
368  /*
369  * If chgParam of subnode is not null then plan will be re-scanned by
370  * first ExecProcNode.
371  */
372  if (subnode->chgParam == NULL)
373  ExecReScan(subnode);
374  }
375  binaryheap_reset(node->ms_heap);
376  node->ms_initialized = false;
377 }
int16 AttrNumber
Definition: attnum.h:21
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
void binaryheap_reset(binaryheap *heap)
Definition: binaryheap.c:63
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
#define binaryheap_empty(h)
Definition: binaryheap.h:65
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1306
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:751
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:582
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Definition: bitmapset.c:1019
#define INVERT_COMPARE_RESULT(var)
Definition: c.h:1106
signed int int32
Definition: c.h:494
#define Assert(condition)
Definition: c.h:858
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
Bitmapset * ExecFindMatchingSubPlans(PartitionPruneState *prunestate, bool initial_prune)
PartitionPruneState * ExecInitPartitionPruning(PlanState *planstate, int n_total_subplans, PartitionPruneInfo *pruneinfo, Bitmapset **initially_valid_subplans)
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1886
void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
Definition: execUtils.c:844
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:269
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
int b
Definition: isn.c:70
int a
Definition: isn.c:69
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void * palloc0(Size size)
Definition: mcxt.c:1346
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1316
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
int32 SlotNumber
static int heap_compare_slots(Datum a, Datum b, void *arg)
MergeAppendState * ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
static TupleTableSlot * ExecMergeAppend(PlanState *pstate)
void ExecReScanMergeAppend(MergeAppendState *node)
int32 SlotNumber
void ExecEndMergeAppend(MergeAppendState *node)
#define makeNode(_type_)
Definition: nodes.h:155
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
void * arg
static int list_length(const List *l)
Definition: pg_list.h:152
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
char * s1
char * s2
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
Definition: sortsupport.c:134
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:200
PlanState ** mergeplans
Definition: execnodes.h:1484
SortSupport ms_sortkeys
Definition: execnodes.h:1487
Bitmapset * ms_valid_subplans
Definition: execnodes.h:1492
PlanState ps
Definition: execnodes.h:1483
struct binaryheap * ms_heap
Definition: execnodes.h:1489
TupleTableSlot ** ms_slots
Definition: execnodes.h:1488
struct PartitionPruneState * ms_prune_state
Definition: execnodes.h:1491
struct PartitionPruneInfo * part_prune_info
Definition: plannodes.h:314
List * mergeplans
Definition: plannodes.h:294
Bitmapset * execparamids
bool resultopsset
Definition: execnodes.h:1202
Plan * plan
Definition: execnodes.h:1117
EState * state
Definition: execnodes.h:1119
Bitmapset * chgParam
Definition: execnodes.h:1149
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1155
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1157
bool resultopsfixed
Definition: execnodes.h:1198
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1123
AttrNumber ssup_attno
Definition: sortsupport.h:81
bool ssup_nulls_first
Definition: sortsupport.h:75
MemoryContext ssup_cxt
Definition: sortsupport.h:66
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395
#define TupIsNull(slot)
Definition: tuptable.h:306