PostgreSQL Source Code git master
Loading...
Searching...
No Matches
nodeAgg.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * nodeAgg.c
4 * Routines to handle aggregate nodes.
5 *
6 * ExecAgg normally evaluates each aggregate in the following steps:
7 *
8 * transvalue = initcond
9 * foreach input_tuple do
10 * transvalue = transfunc(transvalue, input_value(s))
11 * result = finalfunc(transvalue, direct_argument(s))
12 *
13 * If a finalfunc is not supplied then the result is just the ending
14 * value of transvalue.
15 *
16 * Other behaviors can be selected by the "aggsplit" mode, which exists
17 * to support partial aggregation. It is possible to:
18 * * Skip running the finalfunc, so that the output is always the
19 * final transvalue state.
20 * * Substitute the combinefunc for the transfunc, so that transvalue
21 * states (propagated up from a child partial-aggregation step) are merged
22 * rather than processing raw input rows. (The statements below about
23 * the transfunc apply equally to the combinefunc, when it's selected.)
24 * * Apply the serializefunc to the output values (this only makes sense
25 * when skipping the finalfunc, since the serializefunc works on the
26 * transvalue data type).
27 * * Apply the deserializefunc to the input values (this only makes sense
28 * when using the combinefunc, for similar reasons).
29 * It is the planner's responsibility to connect up Agg nodes using these
30 * alternate behaviors in a way that makes sense, with partial aggregation
31 * results being fed to nodes that expect them.
32 *
33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 * input tuples and eliminate duplicates (if required) before performing
35 * the above-depicted process. (However, we don't do that for ordered-set
36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 * so far as this module is concerned.) Note that partial aggregation
38 * is not supported in these cases, since we couldn't ensure global
39 * ordering or distinctness of the inputs.
40 *
41 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 * then the first non-NULL input_value is assigned directly to transvalue,
43 * and transfunc isn't applied until the second non-NULL input_value.
44 * The agg's first input type and transtype must be the same in this case!
45 *
46 * If transfunc is marked "strict" then NULL input_values are skipped,
47 * keeping the previous transvalue. If transfunc is not strict then it
48 * is called for every input tuple and must deal with NULL initcond
49 * or NULL input_values for itself.
50 *
51 * If finalfunc is marked "strict" then it is not called when the
52 * ending transvalue is NULL, instead a NULL result is created
53 * automatically (this is just the usual handling of strict functions,
54 * of course). A non-strict finalfunc can make its own choice of
55 * what to return for a NULL ending transvalue.
56 *
57 * Ordered-set aggregates are treated specially in one other way: we
58 * evaluate any "direct" arguments and pass them to the finalfunc along
59 * with the transition value.
60 *
61 * A finalfunc can have additional arguments beyond the transvalue and
62 * any "direct" arguments, corresponding to the input arguments of the
63 * aggregate. These are always just passed as NULL. Such arguments may be
64 * needed to allow resolution of a polymorphic aggregate's result type.
65 *
66 * We compute aggregate input expressions and run the transition functions
67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 * once per input tuple, so when the transvalue datatype is
69 * pass-by-reference, we have to be careful to copy it into a longer-lived
70 * memory context, and free the prior value to avoid memory leakage. We
71 * store transvalues in another set of econtexts, aggstate->aggcontexts
72 * (one per grouping set, see below), which are also used for the hashtable
73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 * reset, at group boundaries so that aggregate transition functions can
75 * register shutdown callbacks via AggRegisterCallback.
76 *
77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 * run finalize functions and compute the output tuple; this context can be
79 * reset once per output tuple.
80 *
81 * The executor's AggState node is passed as the fmgr "context" value in
82 * all transfunc and finalfunc calls. It is not recommended that the
83 * transition functions look at the AggState node directly, but they can
84 * use AggCheckCallContext() to verify that they are being called by
85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 * transition function might want to know this is so that it can avoid
87 * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 * it can instead just scribble on and return its left input. Ordinarily
89 * it is completely forbidden for functions to modify pass-by-ref inputs,
90 * but in the aggregate case we know the left input is either the initial
91 * transition value or a previous function result, and in either case its
92 * value need not be preserved. See int8inc() for an example. Notice that
93 * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94 * the previous transition value pointer is returned. It is also possible
95 * to avoid repeated data copying when the transition value is an expanded
96 * object: to do that, the transition function must take care to return
97 * an expanded object that is in a child context of the memory context
98 * returned by AggCheckCallContext(). Also, some transition functions want
99 * to store working state in addition to the nominal transition value; they
100 * can use the memory context returned by AggCheckCallContext() to do that.
101 *
102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 * AggState is available as context in earlier releases (back to 8.1),
104 * but direct examination of the node is needed to use it before 9.0.
105 *
106 * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 * to get hold of the Aggref expression node for their aggregate call.
108 * This is mainly intended for ordered-set aggregates, which are not
109 * supported as window functions. (A regular aggregate function would
110 * need some fallback logic to use this, since there's no Aggref node
111 * for a window function.)
112 *
113 * Grouping sets:
114 *
115 * A list of grouping sets which is structurally equivalent to a ROLLUP
116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 * ordered data. We do this by keeping a separate set of transition values
118 * for each grouping set being concurrently processed; for each input tuple
119 * we update them all, and on group boundaries we reset those states
120 * (starting at the front of the list) whose grouping values have changed
121 * (the list of grouping sets is ordered from most specific to least
122 * specific).
123 *
124 * Where more complex grouping sets are used, we break them down into
125 * "phases", where each phase has a different sort order (except phase 0
126 * which is reserved for hashing). During each phase but the last, the
127 * input tuples are additionally stored in a tuplesort which is keyed to the
128 * next phase's sort order; during each phase but the first, the input
129 * tuples are drawn from the previously sorted data. (The sorting of the
130 * data for the first phase is handled by the planner, as it might be
131 * satisfied by underlying nodes.)
132 *
133 * Hashing can be mixed with sorted grouping. To do this, we have an
134 * AGG_MIXED strategy that populates the hashtables during the first sorted
135 * phase, and switches to reading them out after completing all sort phases.
136 * We can also support AGG_HASHED with multiple hash tables and no sorting
137 * at all.
138 *
139 * From the perspective of aggregate transition and final functions, the
140 * only issue regarding grouping sets is this: a single call site (flinfo)
141 * of an aggregate function may be used for updating several different
142 * transition values in turn. So the function must not cache in the flinfo
143 * anything which logically belongs as part of the transition value (most
144 * importantly, the memory context in which the transition value exists).
145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 * sensitive to the grouping set for which the aggregate function is
147 * currently being called.
148 *
149 * Plan structure:
150 *
151 * What we get from the planner is actually one "real" Agg node which is
152 * part of the plan tree proper, but which optionally has an additional list
153 * of Agg nodes hung off the side via the "chain" field. This is because an
154 * Agg node happens to be a convenient representation of all the data we
155 * need for grouping sets.
156 *
157 * For many purposes, we treat the "real" node as if it were just the first
158 * node in the chain. The chain must be ordered such that hashed entries
159 * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 * there are both types present (in which case the real node describes one
161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 * chained nodes.
166 *
167 * We collect all hashed nodes into a single "phase", numbered 0, and create
168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 * Phase 0 is allocated even if there are no hashes, but remains unused in
170 * that case.
171 *
172 * AGG_HASHED nodes actually refer to only a single grouping set each,
173 * because for each hashed grouping we need a separate grpColIdx and
174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 * grouping sets that share a sort order. Each AGG_SORTED node other than
176 * the first one has an associated Sort node which describes the sort order
177 * to be used; the first sorted node takes its input from the outer subtree,
178 * which the planner has already arranged to provide ordered data.
179 *
180 * Memory and ExprContext usage:
181 *
182 * Because we're accumulating aggregate values across input rows, we need to
183 * use more memory contexts than just simple input/output tuple contexts.
184 * In fact, for a rollup, we need a separate context for each grouping set
185 * so that we can reset the inner (finer-grained) aggregates on their group
186 * boundaries while continuing to accumulate values for outer
187 * (coarser-grained) groupings. On top of this, we might be simultaneously
188 * populating hashtables; however, we only need one context for all the
189 * hashtables.
190 *
191 * So we create an array, aggcontexts, with an ExprContext for each grouping
192 * set in the largest rollup that we're going to process, and use the
193 * per-tuple memory context of those ExprContexts to store the aggregate
194 * transition values. hashcontext is the single context created to support
195 * all hash tables.
196 *
197 * Spilling To Disk
198 *
199 * When performing hash aggregation, if the hash table memory exceeds the
200 * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201 * mode, we advance the transition states only for groups already in the
202 * hash table. For tuples that would need to create a new hash table
203 * entries (and initialize new transition states), we instead spill them to
204 * disk to be processed later. The tuples are spilled in a partitioned
205 * manner, so that subsequent batches are smaller and less likely to exceed
206 * hash_mem (if a batch does exceed hash_mem, it must be spilled
207 * recursively).
208 *
209 * Spilled data is written to logical tapes. These provide better control
210 * over memory usage, disk space, and the number of files than if we were
211 * to use a BufFile for each spill. We don't know the number of tapes needed
212 * at the start of the algorithm (because it can recurse), so a tape set is
213 * allocated at the beginning, and individual tapes are created as needed.
214 * As a particular tape is read, logtape.c recycles its disk space. When a
215 * tape is read to completion, it is destroyed entirely.
216 *
217 * Tapes' buffers can take up substantial memory when many tapes are open at
218 * once. We only need one tape open at a time in read mode (using a buffer
219 * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
220 * requiring a buffer of size BLCKSZ) for each partition.
221 *
222 * Note that it's possible for transition states to start small but then
223 * grow very large; for instance in the case of ARRAY_AGG. In such cases,
224 * it's still possible to significantly exceed hash_mem. We try to avoid
225 * this situation by estimating what will fit in the available memory, and
226 * imposing a limit on the number of groups separately from the amount of
227 * memory consumed.
228 *
229 * Transition / Combine function invocation:
230 *
231 * For performance reasons transition functions, including combine
232 * functions, aren't invoked one-by-one from nodeAgg.c after computing
233 * arguments using the expression evaluation engine. Instead
234 * ExecBuildAggTrans() builds one large expression that does both argument
235 * evaluation and transition function invocation. That avoids performance
236 * issues due to repeated uses of expression evaluation, complications due
237 * to filter expressions having to be evaluated early, and allows to JIT
238 * the entire expression into one native function.
239 *
240 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
241 * Portions Copyright (c) 1994, Regents of the University of California
242 *
243 * IDENTIFICATION
244 * src/backend/executor/nodeAgg.c
245 *
246 *-------------------------------------------------------------------------
247 */
248
249#include "postgres.h"
250
251#include "access/htup_details.h"
252#include "access/parallel.h"
253#include "catalog/objectaccess.h"
254#include "catalog/pg_aggregate.h"
255#include "catalog/pg_proc.h"
256#include "catalog/pg_type.h"
257#include "common/hashfn.h"
258#include "executor/execExpr.h"
259#include "executor/executor.h"
260#include "executor/instrument.h"
261#include "executor/nodeAgg.h"
262#include "lib/hyperloglog.h"
263#include "miscadmin.h"
264#include "nodes/nodeFuncs.h"
265#include "optimizer/optimizer.h"
266#include "parser/parse_agg.h"
267#include "parser/parse_coerce.h"
268#include "port/pg_bitutils.h"
269#include "utils/acl.h"
270#include "utils/builtins.h"
271#include "utils/datum.h"
272#include "utils/expandeddatum.h"
274#include "utils/logtape.h"
275#include "utils/lsyscache.h"
276#include "utils/memutils.h"
278#include "utils/syscache.h"
279#include "utils/tuplesort.h"
280
281/*
282 * Control how many partitions are created when spilling HashAgg to
283 * disk.
284 *
285 * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
286 * partitions needed such that each partition will fit in memory. The factor
287 * is set higher than one because there's not a high cost to having a few too
288 * many partitions, and it makes it less likely that a partition will need to
289 * be spilled recursively. Another benefit of having more, smaller partitions
290 * is that small hash tables may perform better than large ones due to memory
291 * caching effects.
292 *
293 * We also specify a min and max number of partitions per spill. Too few might
294 * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
295 * many will result in lots of memory wasted buffering the spill files (which
296 * could instead be spent on a larger hash table).
297 */
298#define HASHAGG_PARTITION_FACTOR 1.50
299#define HASHAGG_MIN_PARTITIONS 4
300#define HASHAGG_MAX_PARTITIONS 1024
301
302/*
303 * For reading from tapes, the buffer size must be a multiple of
304 * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
305 * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
306 * tape always uses a buffer of size BLCKSZ.
307 */
308#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
309#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
310
311/*
312 * HyperLogLog is used for estimating the cardinality of the spilled tuples in
313 * a given partition. 5 bits corresponds to a size of about 32 bytes and a
314 * worst-case error of around 18%. That's effective enough to choose a
315 * reasonable number of partitions when recursing.
316 */
317#define HASHAGG_HLL_BIT_WIDTH 5
318
319/*
320 * Assume the palloc overhead always uses sizeof(MemoryChunk) bytes.
321 */
322#define CHUNKHDRSZ sizeof(MemoryChunk)
323
324/*
325 * Represents partitioned spill data for a single hashtable. Contains the
326 * necessary information to route tuples to the correct partition, and to
327 * transform the spilled data into new batches.
328 *
329 * The high bits are used for partition selection (when recursing, we ignore
330 * the bits that have already been used for partition selection at an earlier
331 * level).
332 */
333typedef struct HashAggSpill
334{
335 int npartitions; /* number of partitions */
336 LogicalTape **partitions; /* spill partition tapes */
337 int64 *ntuples; /* number of tuples in each partition */
338 uint32 mask; /* mask to find partition from hash value */
339 int shift; /* after masking, shift by this amount */
340 hyperLogLogState *hll_card; /* cardinality estimate for contents */
342
343/*
344 * Represents work to be done for one pass of hash aggregation (with only one
345 * grouping set).
346 *
347 * Also tracks the bits of the hash already used for partition selection by
348 * earlier iterations, so that this batch can use new bits. If all bits have
349 * already been used, no partitioning will be done (any spilled data will go
350 * to a single output tape).
351 */
352typedef struct HashAggBatch
353{
354 int setno; /* grouping set */
355 int used_bits; /* number of bits of hash already used */
356 LogicalTape *input_tape; /* input partition tape */
357 int64 input_tuples; /* number of tuples in this batch */
358 double input_card; /* estimated group cardinality */
360
361/* used to find referenced colnos */
362typedef struct FindColsContext
363{
364 bool is_aggref; /* is under an aggref */
365 Bitmapset *aggregated; /* column references under an aggref */
366 Bitmapset *unaggregated; /* other column references */
368
369static void select_current_set(AggState *aggstate, int setno, bool is_hash);
370static void initialize_phase(AggState *aggstate, int newphase);
373 AggStatePerGroup *pergroups,
374 int numReset);
376 AggStatePerTrans pertrans,
380 AggStatePerTrans pertrans,
383 AggStatePerTrans pertrans,
386 AggStatePerAgg peragg,
390 AggStatePerAgg peragg,
393static inline void prepare_hash_slot(AggStatePerHash perhash,
394 TupleTableSlot *inputslot,
395 TupleTableSlot *hashslot);
397 TupleTableSlot *slot,
398 int currentSet);
403static void find_cols(AggState *aggstate, Bitmapset **aggregated,
404 Bitmapset **unaggregated);
405static bool find_cols_walker(Node *node, FindColsContext *context);
407static void build_hash_table(AggState *aggstate, int setno, double nbuckets);
409 bool nullcheck);
411static double hash_choose_num_buckets(double hashentrysize,
412 double ngroups, Size memory);
414 double hashentrysize,
415 int used_bits,
416 int *log2_npartitions);
418 TupleHashTable hashtable,
419 TupleHashEntry entry);
429 int npartitions);
432static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
433 int64 input_tuples, double input_card,
434 int used_bits);
437 int used_bits, double input_groups,
438 double hashentrysize);
440 TupleTableSlot *inputslot, uint32 hash);
442 int setno);
445 AggState *aggstate, EState *estate,
446 Aggref *aggref, Oid transfn_oid,
447 Oid aggtranstype, Oid aggserialfn,
449 bool initValueIsNull, Oid *inputTypes,
450 int numArguments);
451
452
453/*
454 * Select the current grouping set; affects current_set and
455 * curaggcontext.
456 */
457static void
459{
460 /*
461 * When changing this, also adapt ExecAggPlainTransByVal() and
462 * ExecAggPlainTransByRef().
463 */
464 if (is_hash)
465 aggstate->curaggcontext = aggstate->hashcontext;
466 else
467 aggstate->curaggcontext = aggstate->aggcontexts[setno];
468
469 aggstate->current_set = setno;
470}
471
472/*
473 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
474 * current_phase + 1. Juggle the tuplesorts accordingly.
475 *
476 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
477 * case, so when entering phase 0, all we need to do is drop open sorts.
478 */
479static void
481{
483
484 /*
485 * Whatever the previous state, we're now done with whatever input
486 * tuplesort was in use.
487 */
488 if (aggstate->sort_in)
489 {
490 tuplesort_end(aggstate->sort_in);
491 aggstate->sort_in = NULL;
492 }
493
494 if (newphase <= 1)
495 {
496 /*
497 * Discard any existing output tuplesort.
498 */
499 if (aggstate->sort_out)
500 {
501 tuplesort_end(aggstate->sort_out);
502 aggstate->sort_out = NULL;
503 }
504 }
505 else
506 {
507 /*
508 * The old output tuplesort becomes the new input one, and this is the
509 * right time to actually sort it.
510 */
511 aggstate->sort_in = aggstate->sort_out;
512 aggstate->sort_out = NULL;
513 Assert(aggstate->sort_in);
515 }
516
517 /*
518 * If this isn't the last phase, we need to sort appropriately for the
519 * next phase in sequence.
520 */
521 if (newphase > 0 && newphase < aggstate->numphases - 1)
522 {
523 Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
526
527 aggstate->sort_out = tuplesort_begin_heap(tupDesc,
528 sortnode->numCols,
529 sortnode->sortColIdx,
530 sortnode->sortOperators,
531 sortnode->collations,
532 sortnode->nullsFirst,
533 work_mem,
535 }
536
537 aggstate->current_phase = newphase;
538 aggstate->phase = &aggstate->phases[newphase];
539}
540
541/*
542 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
543 * populated by the previous phase. Copy it to the sorter for the next phase
544 * if any.
545 *
546 * Callers cannot rely on memory for tuple in returned slot remaining valid
547 * past any subsequently fetched tuple.
548 */
549static TupleTableSlot *
551{
552 TupleTableSlot *slot;
553
554 if (aggstate->sort_in)
555 {
556 /* make sure we check for interrupts in either path through here */
558 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
559 aggstate->sort_slot, NULL))
560 return NULL;
561 slot = aggstate->sort_slot;
562 }
563 else
565
566 if (!TupIsNull(slot) && aggstate->sort_out)
567 tuplesort_puttupleslot(aggstate->sort_out, slot);
568
569 return slot;
570}
571
572/*
573 * (Re)Initialize an individual aggregate.
574 *
575 * This function handles only one grouping set, already set in
576 * aggstate->current_set.
577 *
578 * When called, CurrentMemoryContext should be the per-query context.
579 */
580static void
583{
584 /*
585 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
586 */
587 if (pertrans->aggsortrequired)
588 {
589 /*
590 * In case of rescan, maybe there could be an uncompleted sort
591 * operation? Clean it up if so.
592 */
593 if (pertrans->sortstates[aggstate->current_set])
594 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
595
596
597 /*
598 * We use a plain Datum sorter when there's a single input column;
599 * otherwise sort the full tuple. (See comments for
600 * process_ordered_aggregate_single.)
601 */
602 if (pertrans->numInputs == 1)
603 {
604 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
605
606 pertrans->sortstates[aggstate->current_set] =
607 tuplesort_begin_datum(attr->atttypid,
608 pertrans->sortOperators[0],
609 pertrans->sortCollations[0],
610 pertrans->sortNullsFirst[0],
612 }
613 else
614 pertrans->sortstates[aggstate->current_set] =
616 pertrans->numSortCols,
617 pertrans->sortColIdx,
618 pertrans->sortOperators,
619 pertrans->sortCollations,
620 pertrans->sortNullsFirst,
622 }
623
624 /*
625 * (Re)set transValue to the initial value.
626 *
627 * Note that when the initial value is pass-by-ref, we must copy it (into
628 * the aggcontext) since we will pfree the transValue later.
629 */
630 if (pertrans->initValueIsNull)
631 pergroupstate->transValue = pertrans->initValue;
632 else
633 {
635
636 oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
637 pergroupstate->transValue = datumCopy(pertrans->initValue,
638 pertrans->transtypeByVal,
639 pertrans->transtypeLen);
641 }
642 pergroupstate->transValueIsNull = pertrans->initValueIsNull;
643
644 /*
645 * If the initial value for the transition state doesn't exist in the
646 * pg_aggregate table then we will let the first non-NULL value returned
647 * from the outer procNode become the initial value. (This is useful for
648 * aggregates like max() and min().) The noTransValue flag signals that we
649 * still need to do this.
650 */
651 pergroupstate->noTransValue = pertrans->initValueIsNull;
652}
653
654/*
655 * Initialize all aggregate transition states for a new group of input values.
656 *
657 * If there are multiple grouping sets, we initialize only the first numReset
658 * of them (the grouping sets are ordered so that the most specific one, which
659 * is reset most often, is first). As a convenience, if numReset is 0, we
660 * reinitialize all sets.
661 *
662 * NB: This cannot be used for hash aggregates, as for those the grouping set
663 * number has to be specified from further up.
664 *
665 * When called, CurrentMemoryContext should be the per-query context.
666 */
667static void
669 AggStatePerGroup *pergroups,
670 int numReset)
671{
672 int transno;
673 int numGroupingSets = Max(aggstate->phase->numsets, 1);
674 int setno = 0;
675 int numTrans = aggstate->numtrans;
677
678 if (numReset == 0)
680
681 for (setno = 0; setno < numReset; setno++)
682 {
683 AggStatePerGroup pergroup = pergroups[setno];
684
685 select_current_set(aggstate, setno, false);
686
687 for (transno = 0; transno < numTrans; transno++)
688 {
689 AggStatePerTrans pertrans = &transstates[transno];
691
693 }
694 }
695}
696
697/*
698 * Given new input value(s), advance the transition function of one aggregate
699 * state within one grouping set only (already set in aggstate->current_set)
700 *
701 * The new values (and null flags) have been preloaded into argument positions
702 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
703 * pass to the transition function. We also expect that the static fields of
704 * the fcinfo are already initialized; that was done by ExecInitAgg().
705 *
706 * It doesn't matter which memory context this is called in.
707 */
708static void
710 AggStatePerTrans pertrans,
712{
713 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
715 Datum newVal;
716
717 if (pertrans->transfn.fn_strict)
718 {
719 /*
720 * For a strict transfn, nothing happens when there's a NULL input; we
721 * just keep the prior transValue.
722 */
723 int numTransInputs = pertrans->numTransInputs;
724 int i;
725
726 for (i = 1; i <= numTransInputs; i++)
727 {
728 if (fcinfo->args[i].isnull)
729 return;
730 }
731 if (pergroupstate->noTransValue)
732 {
733 /*
734 * transValue has not been initialized. This is the first non-NULL
735 * input value. We use it as the initial value for transValue. (We
736 * already checked that the agg's input type is binary-compatible
737 * with its transtype, so straight copy here is OK.)
738 *
739 * We must copy the datum into aggcontext if it is pass-by-ref. We
740 * do not need to pfree the old transValue, since it's NULL.
741 */
742 oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
743 pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
744 pertrans->transtypeByVal,
745 pertrans->transtypeLen);
746 pergroupstate->transValueIsNull = false;
747 pergroupstate->noTransValue = false;
749 return;
750 }
751 if (pergroupstate->transValueIsNull)
752 {
753 /*
754 * Don't call a strict function with NULL inputs. Note it is
755 * possible to get here despite the above tests, if the transfn is
756 * strict *and* returned a NULL on a prior cycle. If that happens
757 * we will propagate the NULL all the way to the end.
758 */
759 return;
760 }
761 }
762
763 /* We run the transition functions in per-input-tuple memory context */
764 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
765
766 /* set up aggstate->curpertrans for AggGetAggref() */
767 aggstate->curpertrans = pertrans;
768
769 /*
770 * OK to call the transition function
771 */
772 fcinfo->args[0].value = pergroupstate->transValue;
773 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
774 fcinfo->isnull = false; /* just in case transfn doesn't set it */
775
776 newVal = FunctionCallInvoke(fcinfo);
777
778 aggstate->curpertrans = NULL;
779
780 /*
781 * If pass-by-ref datatype, must copy the new value into aggcontext and
782 * free the prior transValue. But if transfn returned a pointer to its
783 * first input, we don't need to do anything.
784 *
785 * It's safe to compare newVal with pergroup->transValue without regard
786 * for either being NULL, because ExecAggCopyTransValue takes care to set
787 * transValue to 0 when NULL. Otherwise we could end up accidentally not
788 * reparenting, when the transValue has the same numerical value as
789 * newValue, despite being NULL. This is a somewhat hot path, making it
790 * undesirable to instead solve this with another branch for the common
791 * case of the transition function returning its (modified) input
792 * argument.
793 */
794 if (!pertrans->transtypeByVal &&
795 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
796 newVal = ExecAggCopyTransValue(aggstate, pertrans,
797 newVal, fcinfo->isnull,
798 pergroupstate->transValue,
799 pergroupstate->transValueIsNull);
800
801 pergroupstate->transValue = newVal;
802 pergroupstate->transValueIsNull = fcinfo->isnull;
803
805}
806
807/*
808 * Advance each aggregate transition state for one input tuple. The input
809 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
810 * accessible to ExecEvalExpr.
811 *
812 * We have two sets of transition states to handle: one for sorted aggregation
813 * and one for hashed; we do them both here, to avoid multiple evaluation of
814 * the inputs.
815 *
816 * When called, CurrentMemoryContext should be the per-query context.
817 */
818static void
820{
822 aggstate->tmpcontext);
823}
824
825/*
826 * Run the transition function for a DISTINCT or ORDER BY aggregate
827 * with only one input. This is called after we have completed
828 * entering all the input values into the sort object. We complete the
829 * sort, read out the values in sorted order, and run the transition
830 * function on each value (applying DISTINCT if appropriate).
831 *
832 * Note that the strictness of the transition function was checked when
833 * entering the values into the sort, so we don't check it again here;
834 * we just apply standard SQL DISTINCT logic.
835 *
836 * The one-input case is handled separately from the multi-input case
837 * for performance reasons: for single by-value inputs, such as the
838 * common case of count(distinct id), the tuplesort_getdatum code path
839 * is around 300% faster. (The speedup for by-reference types is less
840 * but still noticeable.)
841 *
842 * This function handles only one grouping set (already set in
843 * aggstate->current_set).
844 *
845 * When called, CurrentMemoryContext should be the per-query context.
846 */
847static void
849 AggStatePerTrans pertrans,
851{
852 Datum oldVal = (Datum) 0;
853 bool oldIsNull = true;
854 bool haveOldVal = false;
855 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
857 bool isDistinct = (pertrans->numDistinctCols > 0);
860 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
861 Datum *newVal;
862 bool *isNull;
863
864 Assert(pertrans->numDistinctCols < 2);
865
866 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
867
868 /* Load the column into argument 1 (arg 0 will be transition value) */
869 newVal = &fcinfo->args[1].value;
870 isNull = &fcinfo->args[1].isnull;
871
872 /*
873 * Note: if input type is pass-by-ref, the datums returned by the sort are
874 * freshly palloc'd in the per-query context, so we must be careful to
875 * pfree them when they are no longer needed.
876 */
877
878 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
879 true, false, newVal, isNull, &newAbbrevVal))
880 {
881 /*
882 * Clear and select the working context for evaluation of the equality
883 * function and transition function.
884 */
887
888 /*
889 * If DISTINCT mode, and not distinct from prior, skip it.
890 */
891 if (isDistinct &&
892 haveOldVal &&
893 ((oldIsNull && *isNull) ||
894 (!oldIsNull && !*isNull &&
897 pertrans->aggCollation,
898 oldVal, *newVal)))))
899 {
901 continue;
902 }
903 else
904 {
906
908
909 /*
910 * Forget the old value, if any, and remember the new one for
911 * subsequent equality checks.
912 */
913 if (!pertrans->inputtypeByVal)
914 {
915 if (!oldIsNull)
916 pfree(DatumGetPointer(oldVal));
917 if (!*isNull)
918 oldVal = datumCopy(*newVal, pertrans->inputtypeByVal,
919 pertrans->inputtypeLen);
920 }
921 else
922 oldVal = *newVal;
924 oldIsNull = *isNull;
925 haveOldVal = true;
926 }
927 }
928
929 if (!oldIsNull && !pertrans->inputtypeByVal)
930 pfree(DatumGetPointer(oldVal));
931
932 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
933 pertrans->sortstates[aggstate->current_set] = NULL;
934}
935
936/*
937 * Run the transition function for a DISTINCT or ORDER BY aggregate
938 * with more than one input. This is called after we have completed
939 * entering all the input values into the sort object. We complete the
940 * sort, read out the values in sorted order, and run the transition
941 * function on each value (applying DISTINCT if appropriate).
942 *
943 * This function handles only one grouping set (already set in
944 * aggstate->current_set).
945 *
946 * When called, CurrentMemoryContext should be the per-query context.
947 */
948static void
950 AggStatePerTrans pertrans,
952{
953 ExprContext *tmpcontext = aggstate->tmpcontext;
954 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
955 TupleTableSlot *slot1 = pertrans->sortslot;
956 TupleTableSlot *slot2 = pertrans->uniqslot;
957 int numTransInputs = pertrans->numTransInputs;
958 int numDistinctCols = pertrans->numDistinctCols;
961 bool haveOldValue = false;
962 TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
963 int i;
964
965 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
966
968 if (slot2)
970
971 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
972 true, true, slot1, &newAbbrevVal))
973 {
975
976 tmpcontext->ecxt_outertuple = slot1;
977 tmpcontext->ecxt_innertuple = slot2;
978
979 if (numDistinctCols == 0 ||
980 !haveOldValue ||
982 !ExecQual(pertrans->equalfnMulti, tmpcontext))
983 {
984 /*
985 * Extract the first numTransInputs columns as datums to pass to
986 * the transfn.
987 */
988 slot_getsomeattrs(slot1, numTransInputs);
989
990 /* Load values into fcinfo */
991 /* Start from 1, since the 0th arg will be the transition value */
992 for (i = 0; i < numTransInputs; i++)
993 {
994 fcinfo->args[i + 1].value = slot1->tts_values[i];
995 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
996 }
997
999
1000 if (numDistinctCols > 0)
1001 {
1002 /* swap the slot pointers to retain the current tuple */
1004
1005 slot2 = slot1;
1006 slot1 = tmpslot;
1007 /* avoid ExecQual() calls by reusing abbreviated keys */
1009 haveOldValue = true;
1010 }
1011 }
1012
1013 /* Reset context each time */
1014 ResetExprContext(tmpcontext);
1015
1017 }
1018
1019 if (slot2)
1021
1022 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1023 pertrans->sortstates[aggstate->current_set] = NULL;
1024
1025 /* restore previous slot, potentially in use for grouping sets */
1026 tmpcontext->ecxt_outertuple = save;
1027}
1028
1029/*
1030 * Compute the final value of one aggregate.
1031 *
1032 * This function handles only one grouping set (already set in
1033 * aggstate->current_set).
1034 *
1035 * The finalfn will be run, and the result delivered, in the
1036 * output-tuple context; caller's CurrentMemoryContext does not matter.
1037 * (But note that in some cases, such as when there is no finalfn, the
1038 * result might be a pointer to or into the agg's transition value.)
1039 *
1040 * The finalfn uses the state as set in the transno. This also might be
1041 * being used by another aggregate function, so it's important that we do
1042 * nothing destructive here. Moreover, the aggregate's final value might
1043 * get used in multiple places, so we mustn't return a R/W expanded datum.
1044 */
1045static void
1047 AggStatePerAgg peragg,
1050{
1051 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1052 bool anynull = false;
1054 int i;
1055 ListCell *lc;
1056 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1057
1058 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1059
1060 /*
1061 * Evaluate any direct arguments. We do this even if there's no finalfn
1062 * (which is unlikely anyway), so that side-effects happen as expected.
1063 * The direct arguments go into arg positions 1 and up, leaving position 0
1064 * for the transition state value.
1065 */
1066 i = 1;
1067 foreach(lc, peragg->aggdirectargs)
1068 {
1069 ExprState *expr = (ExprState *) lfirst(lc);
1070
1071 fcinfo->args[i].value = ExecEvalExpr(expr,
1072 aggstate->ss.ps.ps_ExprContext,
1073 &fcinfo->args[i].isnull);
1074 anynull |= fcinfo->args[i].isnull;
1075 i++;
1076 }
1077
1078 /*
1079 * Apply the agg's finalfn if one is provided, else return transValue.
1080 */
1081 if (OidIsValid(peragg->finalfn_oid))
1082 {
1083 int numFinalArgs = peragg->numFinalArgs;
1084
1085 /* set up aggstate->curperagg for AggGetAggref() */
1086 aggstate->curperagg = peragg;
1087
1088 InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1089 numFinalArgs,
1090 pertrans->aggCollation,
1091 (Node *) aggstate, NULL);
1092
1093 /* Fill in the transition state value */
1094 fcinfo->args[0].value =
1096 pergroupstate->transValueIsNull,
1097 pertrans->transtypeLen);
1098 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1099 anynull |= pergroupstate->transValueIsNull;
1100
1101 /* Fill any remaining argument positions with nulls */
1102 for (; i < numFinalArgs; i++)
1103 {
1104 fcinfo->args[i].value = (Datum) 0;
1105 fcinfo->args[i].isnull = true;
1106 anynull = true;
1107 }
1108
1109 if (fcinfo->flinfo->fn_strict && anynull)
1110 {
1111 /* don't call a strict function with NULL inputs */
1112 *resultVal = (Datum) 0;
1113 *resultIsNull = true;
1114 }
1115 else
1116 {
1117 Datum result;
1118
1119 result = FunctionCallInvoke(fcinfo);
1120 *resultIsNull = fcinfo->isnull;
1122 fcinfo->isnull,
1123 peragg->resulttypeLen);
1124 }
1125 aggstate->curperagg = NULL;
1126 }
1127 else
1128 {
1129 *resultVal =
1131 pergroupstate->transValueIsNull,
1132 pertrans->transtypeLen);
1133 *resultIsNull = pergroupstate->transValueIsNull;
1134 }
1135
1137}
1138
1139/*
1140 * Compute the output value of one partial aggregate.
1141 *
1142 * The serialization function will be run, and the result delivered, in the
1143 * output-tuple context; caller's CurrentMemoryContext does not matter.
1144 */
1145static void
1147 AggStatePerAgg peragg,
1150{
1151 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1153
1154 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1155
1156 /*
1157 * serialfn_oid will be set if we must serialize the transvalue before
1158 * returning it
1159 */
1160 if (OidIsValid(pertrans->serialfn_oid))
1161 {
1162 /* Don't call a strict serialization function with NULL input. */
1163 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1164 {
1165 *resultVal = (Datum) 0;
1166 *resultIsNull = true;
1167 }
1168 else
1169 {
1170 FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1171 Datum result;
1172
1173 fcinfo->args[0].value =
1175 pergroupstate->transValueIsNull,
1176 pertrans->transtypeLen);
1177 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1178 fcinfo->isnull = false;
1179
1180 result = FunctionCallInvoke(fcinfo);
1181 *resultIsNull = fcinfo->isnull;
1183 fcinfo->isnull,
1184 peragg->resulttypeLen);
1185 }
1186 }
1187 else
1188 {
1189 *resultVal =
1191 pergroupstate->transValueIsNull,
1192 pertrans->transtypeLen);
1193 *resultIsNull = pergroupstate->transValueIsNull;
1194 }
1195
1197}
1198
1199/*
1200 * Extract the attributes that make up the grouping key into the
1201 * hashslot. This is necessary to compute the hash or perform a lookup.
1202 */
1203static inline void
1205 TupleTableSlot *inputslot,
1206 TupleTableSlot *hashslot)
1207{
1208 int i;
1209
1210 /* transfer just the needed columns into hashslot */
1211 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1212 ExecClearTuple(hashslot);
1213
1214 for (i = 0; i < perhash->numhashGrpCols; i++)
1215 {
1216 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1217
1218 hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1219 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1220 }
1221 ExecStoreVirtualTuple(hashslot);
1222}
1223
1224/*
1225 * Prepare to finalize and project based on the specified representative tuple
1226 * slot and grouping set.
1227 *
1228 * In the specified tuple slot, force to null all attributes that should be
1229 * read as null in the context of the current grouping set. Also stash the
1230 * current group bitmap where GroupingExpr can get at it.
1231 *
1232 * This relies on three conditions:
1233 *
1234 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1235 * only reference it in evaluations, which will only access individual
1236 * attributes.
1237 *
1238 * 2) No system columns are going to need to be nulled. (If a system column is
1239 * referenced in a group clause, it is actually projected in the outer plan
1240 * tlist.)
1241 *
1242 * 3) Within a given phase, we never need to recover the value of an attribute
1243 * once it has been set to null.
1244 *
1245 * Poking into the slot this way is a bit ugly, but the consensus is that the
1246 * alternative was worse.
1247 */
1248static void
1250{
1251 if (aggstate->phase->grouped_cols)
1252 {
1253 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1254
1255 aggstate->grouped_cols = grouped_cols;
1256
1257 if (TTS_EMPTY(slot))
1258 {
1259 /*
1260 * Force all values to be NULL if working on an empty input tuple
1261 * (i.e. an empty grouping set for which no input rows were
1262 * supplied).
1263 */
1265 }
1266 else if (aggstate->all_grouped_cols)
1267 {
1268 ListCell *lc;
1269
1270 /* all_grouped_cols is arranged in desc order */
1271 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1272
1273 foreach(lc, aggstate->all_grouped_cols)
1274 {
1275 int attnum = lfirst_int(lc);
1276
1277 if (!bms_is_member(attnum, grouped_cols))
1278 slot->tts_isnull[attnum - 1] = true;
1279 }
1280 }
1281 }
1282}
1283
1284/*
1285 * Compute the final value of all aggregates for one group.
1286 *
1287 * This function handles only one grouping set at a time, which the caller must
1288 * have selected. It's also the caller's responsibility to adjust the supplied
1289 * pergroup parameter to point to the current set's transvalues.
1290 *
1291 * Results are stored in the output econtext aggvalues/aggnulls.
1292 */
1293static void
1297{
1298 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1299 Datum *aggvalues = econtext->ecxt_aggvalues;
1300 bool *aggnulls = econtext->ecxt_aggnulls;
1301 int aggno;
1302
1303 /*
1304 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1305 * inputs and run the transition functions.
1306 */
1307 for (int transno = 0; transno < aggstate->numtrans; transno++)
1308 {
1309 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1311
1312 pergroupstate = &pergroup[transno];
1313
1314 if (pertrans->aggsortrequired)
1315 {
1316 Assert(aggstate->aggstrategy != AGG_HASHED &&
1317 aggstate->aggstrategy != AGG_MIXED);
1318
1319 if (pertrans->numInputs == 1)
1321 pertrans,
1323 else
1325 pertrans,
1327 }
1328 else if (pertrans->numDistinctCols > 0 && pertrans->haslast)
1329 {
1330 pertrans->haslast = false;
1331
1332 if (pertrans->numDistinctCols == 1)
1333 {
1334 if (!pertrans->inputtypeByVal && !pertrans->lastisnull)
1335 pfree(DatumGetPointer(pertrans->lastdatum));
1336
1337 pertrans->lastisnull = false;
1338 pertrans->lastdatum = (Datum) 0;
1339 }
1340 else
1341 ExecClearTuple(pertrans->uniqslot);
1342 }
1343 }
1344
1345 /*
1346 * Run the final functions.
1347 */
1348 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1349 {
1350 AggStatePerAgg peragg = &peraggs[aggno];
1351 int transno = peragg->transno;
1353
1354 pergroupstate = &pergroup[transno];
1355
1356 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1358 &aggvalues[aggno], &aggnulls[aggno]);
1359 else
1361 &aggvalues[aggno], &aggnulls[aggno]);
1362 }
1363}
1364
1365/*
1366 * Project the result of a group (whose aggs have already been calculated by
1367 * finalize_aggregates). Returns the result slot, or NULL if no row is
1368 * projected (suppressed by qual).
1369 */
1370static TupleTableSlot *
1372{
1373 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1374
1375 /*
1376 * Check the qual (HAVING clause); if the group does not match, ignore it.
1377 */
1378 if (ExecQual(aggstate->ss.ps.qual, econtext))
1379 {
1380 /*
1381 * Form and return projection tuple using the aggregate results and
1382 * the representative input tuple.
1383 */
1384 return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1385 }
1386 else
1388
1389 return NULL;
1390}
1391
1392/*
1393 * Find input-tuple columns that are needed, dividing them into
1394 * aggregated and unaggregated sets.
1395 */
1396static void
1397find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1398{
1399 Agg *agg = (Agg *) aggstate->ss.ps.plan;
1400 FindColsContext context;
1401
1402 context.is_aggref = false;
1403 context.aggregated = NULL;
1404 context.unaggregated = NULL;
1405
1406 /* Examine tlist and quals */
1407 (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1408 (void) find_cols_walker((Node *) agg->plan.qual, &context);
1409
1410 /* In some cases, grouping columns will not appear in the tlist */
1411 for (int i = 0; i < agg->numCols; i++)
1412 context.unaggregated = bms_add_member(context.unaggregated,
1413 agg->grpColIdx[i]);
1414
1415 *aggregated = context.aggregated;
1416 *unaggregated = context.unaggregated;
1417}
1418
1419static bool
1421{
1422 if (node == NULL)
1423 return false;
1424 if (IsA(node, Var))
1425 {
1426 Var *var = (Var *) node;
1427
1428 /* setrefs.c should have set the varno to OUTER_VAR */
1429 Assert(var->varno == OUTER_VAR);
1430 Assert(var->varlevelsup == 0);
1431 if (context->is_aggref)
1432 context->aggregated = bms_add_member(context->aggregated,
1433 var->varattno);
1434 else
1435 context->unaggregated = bms_add_member(context->unaggregated,
1436 var->varattno);
1437 return false;
1438 }
1439 if (IsA(node, Aggref))
1440 {
1441 Assert(!context->is_aggref);
1442 context->is_aggref = true;
1444 context->is_aggref = false;
1445 return false;
1446 }
1447 return expression_tree_walker(node, find_cols_walker, context);
1448}
1449
1450/*
1451 * (Re-)initialize the hash table(s) to empty.
1452 *
1453 * To implement hashed aggregation, we need a hashtable that stores a
1454 * representative tuple and an array of AggStatePerGroup structs for each
1455 * distinct set of GROUP BY column values. We compute the hash key from the
1456 * GROUP BY columns. The per-group data is allocated in initialize_hash_entry(),
1457 * for each entry.
1458 *
1459 * We have a separate hashtable and associated perhash data structure for each
1460 * grouping set for which we're doing hashing.
1461 *
1462 * The contents of the hash tables live in the aggstate's hash_tuplescxt
1463 * memory context (there is only one of these for all tables together, since
1464 * they are all reset at the same time).
1465 */
1466static void
1468{
1469 int setno;
1470
1471 for (setno = 0; setno < aggstate->num_hashes; ++setno)
1472 {
1473 AggStatePerHash perhash = &aggstate->perhash[setno];
1474 double nbuckets;
1475 Size memory;
1476
1477 if (perhash->hashtable != NULL)
1478 {
1480 continue;
1481 }
1482
1483 memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1484
1485 /* choose reasonable number of buckets per hashtable */
1486 nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1487 perhash->aggnode->numGroups,
1488 memory);
1489
1490#ifdef USE_INJECTION_POINTS
1491 if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
1492 {
1493 nbuckets = memory / TupleHashEntrySize();
1494 INJECTION_POINT_CACHED("hash-aggregate-oversize-table", NULL);
1495 }
1496#endif
1497
1498 build_hash_table(aggstate, setno, nbuckets);
1499 }
1500
1501 aggstate->hash_ngroups_current = 0;
1502}
1503
1504/*
1505 * Build a single hashtable for this grouping set.
1506 */
1507static void
1508build_hash_table(AggState *aggstate, int setno, double nbuckets)
1509{
1510 AggStatePerHash perhash = &aggstate->perhash[setno];
1511 MemoryContext metacxt = aggstate->hash_metacxt;
1512 MemoryContext tuplescxt = aggstate->hash_tuplescxt;
1513 MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1514 Size additionalsize;
1515
1516 Assert(aggstate->aggstrategy == AGG_HASHED ||
1517 aggstate->aggstrategy == AGG_MIXED);
1518
1519 /*
1520 * Used to make sure initial hash table allocation does not exceed
1521 * hash_mem. Note that the estimate does not include space for
1522 * pass-by-reference transition data values, nor for the representative
1523 * tuple of each group.
1524 */
1525 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1526
1527 perhash->hashtable = BuildTupleHashTable(&aggstate->ss.ps,
1528 perhash->hashslot->tts_tupleDescriptor,
1529 perhash->hashslot->tts_ops,
1530 perhash->numCols,
1531 perhash->hashGrpColIdxHash,
1532 perhash->eqfuncoids,
1533 perhash->hashfunctions,
1534 perhash->aggnode->grpCollations,
1535 nbuckets,
1536 additionalsize,
1537 metacxt,
1538 tuplescxt,
1539 tmpcxt,
1540 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1541}
1542
1543/*
1544 * Compute columns that actually need to be stored in hashtable entries. The
1545 * incoming tuples from the child plan node will contain grouping columns,
1546 * other columns referenced in our targetlist and qual, columns used to
1547 * compute the aggregate functions, and perhaps just junk columns we don't use
1548 * at all. Only columns of the first two types need to be stored in the
1549 * hashtable, and getting rid of the others can make the table entries
1550 * significantly smaller. The hashtable only contains the relevant columns,
1551 * and is packed/unpacked in lookup_hash_entries() / agg_retrieve_hash_table()
1552 * into the format of the normal input descriptor.
1553 *
1554 * Additional columns, in addition to the columns grouped by, come from two
1555 * sources: Firstly functionally dependent columns that we don't need to group
1556 * by themselves, and secondly ctids for row-marks.
1557 *
1558 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1559 * then build an array of the columns included in the hashtable. We might
1560 * still have duplicates if the passed-in grpColIdx has them, which can happen
1561 * in edge cases from semijoins/distinct; these can't always be removed,
1562 * because it's not certain that the duplicate cols will be using the same
1563 * hash function.
1564 *
1565 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1566 * the per-query context (unlike the hash table itself).
1567 */
1568static void
1570{
1573 TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1574 List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1575 int numHashes = aggstate->num_hashes;
1576 EState *estate = aggstate->ss.ps.state;
1577 int j;
1578
1579 /* Find Vars that will be needed in tlist and qual */
1582 aggstate->max_colno_needed = 0;
1583 aggstate->all_cols_needed = true;
1584
1585 for (int i = 0; i < scanDesc->natts; i++)
1586 {
1587 int colno = i + 1;
1588
1589 if (bms_is_member(colno, aggstate->colnos_needed))
1590 aggstate->max_colno_needed = colno;
1591 else
1592 aggstate->all_cols_needed = false;
1593 }
1594
1595 for (j = 0; j < numHashes; ++j)
1596 {
1597 AggStatePerHash perhash = &aggstate->perhash[j];
1599 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1600 List *hashTlist = NIL;
1602 int maxCols;
1603 int i;
1604
1605 perhash->largestGrpColIdx = 0;
1606
1607 /*
1608 * If we're doing grouping sets, then some Vars might be referenced in
1609 * tlist/qual for the benefit of other grouping sets, but not needed
1610 * when hashing; i.e. prepare_projection_slot will null them out, so
1611 * there'd be no point storing them. Use prepare_projection_slot's
1612 * logic to determine which.
1613 */
1614 if (aggstate->phases[0].grouped_cols)
1615 {
1616 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1617 ListCell *lc;
1618
1619 foreach(lc, aggstate->all_grouped_cols)
1620 {
1621 int attnum = lfirst_int(lc);
1622
1623 if (!bms_is_member(attnum, grouped_cols))
1625 }
1626 }
1627
1628 /*
1629 * Compute maximum number of input columns accounting for possible
1630 * duplications in the grpColIdx array, which can happen in some edge
1631 * cases where HashAggregate was generated as part of a semijoin or a
1632 * DISTINCT.
1633 */
1634 maxCols = bms_num_members(colnos) + perhash->numCols;
1635
1636 perhash->hashGrpColIdxInput =
1637 palloc(maxCols * sizeof(AttrNumber));
1638 perhash->hashGrpColIdxHash =
1639 palloc(perhash->numCols * sizeof(AttrNumber));
1640
1641 /* Add all the grouping columns to colnos */
1642 for (i = 0; i < perhash->numCols; i++)
1644
1645 /*
1646 * First build mapping for columns directly hashed. These are the
1647 * first, because they'll be accessed when computing hash values and
1648 * comparing tuples for exact matches. We also build simple mapping
1649 * for execGrouping, so it knows where to find the to-be-hashed /
1650 * compared columns in the input.
1651 */
1652 for (i = 0; i < perhash->numCols; i++)
1653 {
1654 perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1655 perhash->hashGrpColIdxHash[i] = i + 1;
1656 perhash->numhashGrpCols++;
1657 /* delete already mapped columns */
1659 }
1660
1661 /* and add the remaining columns */
1662 i = -1;
1663 while ((i = bms_next_member(colnos, i)) >= 0)
1664 {
1665 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1666 perhash->numhashGrpCols++;
1667 }
1668
1669 /* and build a tuple descriptor for the hashtable */
1670 for (i = 0; i < perhash->numhashGrpCols; i++)
1671 {
1672 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1673
1675 perhash->largestGrpColIdx =
1676 Max(varNumber + 1, perhash->largestGrpColIdx);
1677 }
1678
1680
1682 perhash->aggnode->grpOperators,
1683 &perhash->eqfuncoids,
1684 &perhash->hashfunctions);
1685 perhash->hashslot =
1687 &TTSOpsMinimalTuple, 0);
1688
1691 }
1692
1694}
1695
1696/*
1697 * Estimate per-hash-table-entry overhead.
1698 */
1699Size
1701{
1706 tupleWidth);
1708
1709 /*
1710 * Entries use the Bump allocator, so the chunk sizes are the same as the
1711 * requested sizes.
1712 */
1715
1716 /*
1717 * Transition values use AllocSet, which has a chunk header and also uses
1718 * power-of-two allocations.
1719 */
1720 if (transitionSpace > 0)
1722 else
1724
1725 return
1730}
1731
1732/*
1733 * hashagg_recompile_expressions()
1734 *
1735 * Identifies the right phase, compiles the right expression given the
1736 * arguments, and then sets phase->evalfunc to that expression.
1737 *
1738 * Different versions of the compiled expression are needed depending on
1739 * whether hash aggregation has spilled or not, and whether it's reading from
1740 * the outer plan or a tape. Before spilling to disk, the expression reads
1741 * from the outer plan and does not need to perform a NULL check. After
1742 * HashAgg begins to spill, new groups will not be created in the hash table,
1743 * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1744 * pointer check to the expression. Then, when reading spilled data from a
1745 * tape, we change the outer slot type to be a fixed minimal tuple slot.
1746 *
1747 * It would be wasteful to recompile every time, so cache the compiled
1748 * expressions in the AggStatePerPhase, and reuse when appropriate.
1749 */
1750static void
1752{
1753 AggStatePerPhase phase;
1754 int i = minslot ? 1 : 0;
1755 int j = nullcheck ? 1 : 0;
1756
1757 Assert(aggstate->aggstrategy == AGG_HASHED ||
1758 aggstate->aggstrategy == AGG_MIXED);
1759
1760 if (aggstate->aggstrategy == AGG_HASHED)
1761 phase = &aggstate->phases[0];
1762 else /* AGG_MIXED */
1763 phase = &aggstate->phases[1];
1764
1765 if (phase->evaltrans_cache[i][j] == NULL)
1766 {
1767 const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1768 bool outerfixed = aggstate->ss.ps.outeropsfixed;
1769 bool dohash = true;
1770 bool dosort = false;
1771
1772 /*
1773 * If minslot is true, that means we are processing a spilled batch
1774 * (inside agg_refill_hash_table()), and we must not advance the
1775 * sorted grouping sets.
1776 */
1777 if (aggstate->aggstrategy == AGG_MIXED && !minslot)
1778 dosort = true;
1779
1780 /* temporarily change the outerops while compiling the expression */
1781 if (minslot)
1782 {
1783 aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1784 aggstate->ss.ps.outeropsfixed = true;
1785 }
1786
1787 phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1788 dosort, dohash,
1789 nullcheck);
1790
1791 /* change back */
1792 aggstate->ss.ps.outerops = outerops;
1793 aggstate->ss.ps.outeropsfixed = outerfixed;
1794 }
1795
1796 phase->evaltrans = phase->evaltrans_cache[i][j];
1797}
1798
1799/*
1800 * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1801 * number of partitions we expect to create (if we do spill).
1802 *
1803 * There are two limits: a memory limit, and also an ngroups limit. The
1804 * ngroups limit becomes important when we expect transition values to grow
1805 * substantially larger than the initial value.
1806 */
1807void
1808hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1809 Size *mem_limit, uint64 *ngroups_limit,
1810 int *num_partitions)
1811{
1812 int npartitions;
1814 Size hash_mem_limit = get_hash_memory_limit();
1815
1816 /* if not expected to spill, use all of hash_mem */
1817 if (input_groups * hashentrysize <= hash_mem_limit)
1818 {
1819 if (num_partitions != NULL)
1820 *num_partitions = 0;
1821 *mem_limit = hash_mem_limit;
1822 *ngroups_limit = hash_mem_limit / hashentrysize;
1823 return;
1824 }
1825
1826 /*
1827 * Calculate expected memory requirements for spilling, which is the size
1828 * of the buffers needed for all the tapes that need to be open at once.
1829 * Then, subtract that from the memory available for holding hash tables.
1830 */
1832 hashentrysize,
1833 used_bits,
1834 NULL);
1835 if (num_partitions != NULL)
1836 *num_partitions = npartitions;
1837
1840 HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1841
1842 /*
1843 * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1844 * minimum number of partitions, so we aren't going to dramatically exceed
1845 * work mem anyway.
1846 */
1847 if (hash_mem_limit > 4 * partition_mem)
1848 *mem_limit = hash_mem_limit - partition_mem;
1849 else
1850 *mem_limit = hash_mem_limit * 0.75;
1851
1852 if (*mem_limit > hashentrysize)
1853 *ngroups_limit = *mem_limit / hashentrysize;
1854 else
1855 *ngroups_limit = 1;
1856}
1857
1858/*
1859 * hash_agg_check_limits
1860 *
1861 * After adding a new group to the hash table, check whether we need to enter
1862 * spill mode. Allocations may happen without adding new groups (for instance,
1863 * if the transition state size grows), so this check is imperfect.
1864 */
1865static void
1867{
1868 uint64 ngroups = aggstate->hash_ngroups_current;
1870 true);
1872 true);
1873 Size tval_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
1874 true);
1876 bool do_spill = false;
1877
1878#ifdef USE_INJECTION_POINTS
1879 if (ngroups >= 1000)
1880 {
1881 if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
1882 {
1883 do_spill = true;
1884 INJECTION_POINT_CACHED("hash-aggregate-spill-1000", NULL);
1885 }
1886 }
1887#endif
1888
1889 /*
1890 * Don't spill unless there's at least one group in the hash table so we
1891 * can be sure to make progress even in edge cases.
1892 */
1893 if (aggstate->hash_ngroups_current > 0 &&
1894 (total_mem > aggstate->hash_mem_limit ||
1895 ngroups > aggstate->hash_ngroups_limit))
1896 {
1897 do_spill = true;
1898 }
1899
1900 if (do_spill)
1902}
1903
1904/*
1905 * Enter "spill mode", meaning that no new groups are added to any of the hash
1906 * tables. Tuples that would create a new group are instead spilled, and
1907 * processed later.
1908 */
1909static void
1911{
1912 INJECTION_POINT("hash-aggregate-enter-spill-mode", NULL);
1913 aggstate->hash_spill_mode = true;
1914 hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1915
1916 if (!aggstate->hash_ever_spilled)
1917 {
1918 Assert(aggstate->hash_tapeset == NULL);
1919 Assert(aggstate->hash_spills == NULL);
1920
1921 aggstate->hash_ever_spilled = true;
1922
1923 aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
1924
1925 aggstate->hash_spills = palloc_array(HashAggSpill, aggstate->num_hashes);
1926
1927 for (int setno = 0; setno < aggstate->num_hashes; setno++)
1928 {
1929 AggStatePerHash perhash = &aggstate->perhash[setno];
1930 HashAggSpill *spill = &aggstate->hash_spills[setno];
1931
1932 hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
1933 perhash->aggnode->numGroups,
1934 aggstate->hashentrysize);
1935 }
1936 }
1937}
1938
1939/*
1940 * Update metrics after filling the hash table.
1941 *
1942 * If reading from the outer plan, from_tape should be false; if reading from
1943 * another tape, from_tape should be true.
1944 */
1945static void
1947{
1948 Size meta_mem;
1953
1954 if (aggstate->aggstrategy != AGG_MIXED &&
1955 aggstate->aggstrategy != AGG_HASHED)
1956 return;
1957
1958 /* memory for the hash table itself */
1959 meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1960
1961 /* memory for hash entries */
1962 entry_mem = MemoryContextMemAllocated(aggstate->hash_tuplescxt, true);
1963
1964 /* memory for byref transition states */
1965 hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1966
1967 /* memory for read/write tape buffers, if spilled */
1968 buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1969 if (from_tape)
1971
1972 /* update peak mem */
1974 if (total_mem > aggstate->hash_mem_peak)
1975 aggstate->hash_mem_peak = total_mem;
1976
1977 /* update disk usage */
1978 if (aggstate->hash_tapeset != NULL)
1979 {
1980 uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
1981
1982 if (aggstate->hash_disk_used < disk_used)
1983 aggstate->hash_disk_used = disk_used;
1984 }
1985
1986 /* update hashentrysize estimate based on contents */
1987 if (aggstate->hash_ngroups_current > 0)
1988 {
1989 aggstate->hashentrysize =
1991 (hashkey_mem / (double) aggstate->hash_ngroups_current);
1992 }
1993}
1994
1995/*
1996 * Create memory contexts used for hash aggregation.
1997 */
1998static void
2000{
2001 Size maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE;
2002
2003 /*
2004 * The hashcontext's per-tuple memory will be used for byref transition
2005 * values and returned by AggCheckCallContext().
2006 */
2007 aggstate->hashcontext = CreateWorkExprContext(aggstate->ss.ps.state);
2008
2009 /*
2010 * The meta context will be used for the bucket array of
2011 * TupleHashEntryData (or arrays, in the case of grouping sets). As the
2012 * hash table grows, the bucket array will double in size and the old one
2013 * will be freed, so an AllocSet is appropriate. For large bucket arrays,
2014 * the large allocation path will be used, so it's not worth worrying
2015 * about wasting space due to power-of-two allocations.
2016 */
2017 aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
2018 "HashAgg meta context",
2020
2021 /*
2022 * The hash entries themselves, which include the grouping key
2023 * (firstTuple) and pergroup data, are stored in the table context. The
2024 * bump allocator can be used because the entries are not freed until the
2025 * entire hash table is reset. The bump allocator is faster for
2026 * allocations and avoids wasting space on the chunk header or
2027 * power-of-two allocations.
2028 *
2029 * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
2030 * to avoid large jumps in memory usage.
2031 */
2032
2033 /*
2034 * Like CreateWorkExprContext(), use smaller sizings for smaller work_mem,
2035 * to avoid large jumps in memory usage.
2036 */
2037 maxBlockSize = pg_prevpower2_size_t(work_mem * (Size) 1024 / 16);
2038
2039 /* But no bigger than ALLOCSET_DEFAULT_MAXSIZE */
2040 maxBlockSize = Min(maxBlockSize, ALLOCSET_DEFAULT_MAXSIZE);
2041
2042 /* and no smaller than ALLOCSET_DEFAULT_INITSIZE */
2043 maxBlockSize = Max(maxBlockSize, ALLOCSET_DEFAULT_INITSIZE);
2044
2045 aggstate->hash_tuplescxt = BumpContextCreate(aggstate->ss.ps.state->es_query_cxt,
2046 "HashAgg hashed tuples",
2049 maxBlockSize);
2050
2051}
2052
2053/*
2054 * Choose a reasonable number of buckets for the initial hash table size.
2055 */
2056static double
2057hash_choose_num_buckets(double hashentrysize, double ngroups, Size memory)
2058{
2059 double max_nbuckets;
2060 double nbuckets = ngroups;
2061
2062 max_nbuckets = memory / hashentrysize;
2063
2064 /*
2065 * Underestimating is better than overestimating. Too many buckets crowd
2066 * out space for group keys and transition state values.
2067 */
2068 max_nbuckets /= 2;
2069
2070 if (nbuckets > max_nbuckets)
2071 nbuckets = max_nbuckets;
2072
2073 /*
2074 * BuildTupleHashTable will clamp any obviously-insane result, so we don't
2075 * need to be too careful here.
2076 */
2077 return nbuckets;
2078}
2079
2080/*
2081 * Determine the number of partitions to create when spilling, which will
2082 * always be a power of two. If log2_npartitions is non-NULL, set
2083 * *log2_npartitions to the log2() of the number of partitions.
2084 */
2085static int
2086hash_choose_num_partitions(double input_groups, double hashentrysize,
2087 int used_bits, int *log2_npartitions)
2088{
2089 Size hash_mem_limit = get_hash_memory_limit();
2090 double partition_limit;
2091 double mem_wanted;
2092 double dpartitions;
2093 int npartitions;
2094 int partition_bits;
2095
2096 /*
2097 * Avoid creating so many partitions that the memory requirements of the
2098 * open partition files are greater than 1/4 of hash_mem.
2099 */
2101 (hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
2103
2105
2106 /* make enough partitions so that each one is likely to fit in memory */
2107 dpartitions = 1 + (mem_wanted / hash_mem_limit);
2108
2111
2116
2117 /* HASHAGG_MAX_PARTITIONS limit makes this safe */
2118 npartitions = (int) dpartitions;
2119
2120 /* ceil(log2(npartitions)) */
2121 partition_bits = pg_ceil_log2_32(npartitions);
2122
2123 /* make sure that we don't exhaust the hash bits */
2124 if (partition_bits + used_bits >= 32)
2125 partition_bits = 32 - used_bits;
2126
2127 if (log2_npartitions != NULL)
2129
2130 /* number of partitions will be a power of two */
2131 npartitions = 1 << partition_bits;
2132
2133 return npartitions;
2134}
2135
2136/*
2137 * Initialize a freshly-created TupleHashEntry.
2138 */
2139static void
2141 TupleHashEntry entry)
2142{
2144 int transno;
2145
2146 aggstate->hash_ngroups_current++;
2148
2149 /* no need to allocate or initialize per-group state */
2150 if (aggstate->numtrans == 0)
2151 return;
2152
2154
2155 /*
2156 * Initialize aggregates for new tuple group, lookup_hash_entries()
2157 * already has selected the relevant grouping set.
2158 */
2159 for (transno = 0; transno < aggstate->numtrans; transno++)
2160 {
2161 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2163
2165 }
2166}
2167
2168/*
2169 * Look up hash entries for the current tuple in all hashed grouping sets.
2170 *
2171 * Some entries may be left NULL if we are in "spill mode". The same tuple
2172 * will belong to different groups for each grouping set, so may match a group
2173 * already in memory for one set and match a group not in memory for another
2174 * set. When in "spill mode", the tuple will be spilled for each grouping set
2175 * where it doesn't match a group in memory.
2176 *
2177 * NB: It's possible to spill the same tuple for several different grouping
2178 * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2179 * the tuple multiple times for multiple grouping sets, it can be partitioned
2180 * for each grouping set, making the refilling of the hash table very
2181 * efficient.
2182 */
2183static void
2185{
2186 AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2187 TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2188 int setno;
2189
2190 for (setno = 0; setno < aggstate->num_hashes; setno++)
2191 {
2192 AggStatePerHash perhash = &aggstate->perhash[setno];
2193 TupleHashTable hashtable = perhash->hashtable;
2194 TupleTableSlot *hashslot = perhash->hashslot;
2195 TupleHashEntry entry;
2196 uint32 hash;
2197 bool isnew = false;
2198 bool *p_isnew;
2199
2200 /* if hash table already spilled, don't create new entries */
2201 p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2202
2203 select_current_set(aggstate, setno, true);
2204 prepare_hash_slot(perhash,
2205 outerslot,
2206 hashslot);
2207
2208 entry = LookupTupleHashEntry(hashtable, hashslot,
2209 p_isnew, &hash);
2210
2211 if (entry != NULL)
2212 {
2213 if (isnew)
2214 initialize_hash_entry(aggstate, hashtable, entry);
2215 pergroup[setno] = TupleHashEntryGetAdditional(hashtable, entry);
2216 }
2217 else
2218 {
2219 HashAggSpill *spill = &aggstate->hash_spills[setno];
2220 TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2221
2222 if (spill->partitions == NULL)
2223 hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
2224 perhash->aggnode->numGroups,
2225 aggstate->hashentrysize);
2226
2228 pergroup[setno] = NULL;
2229 }
2230 }
2231}
2232
2233/*
2234 * ExecAgg -
2235 *
2236 * ExecAgg receives tuples from its outer subplan and aggregates over
2237 * the appropriate attribute for each aggregate function use (Aggref
2238 * node) appearing in the targetlist or qual of the node. The number
2239 * of tuples to aggregate over depends on whether grouped or plain
2240 * aggregation is selected. In grouped aggregation, we produce a result
2241 * row for each group; in plain aggregation there's a single result row
2242 * for the whole query. In either case, the value of each aggregate is
2243 * stored in the expression context to be used when ExecProject evaluates
2244 * the result tuple.
2245 */
2246static TupleTableSlot *
2248{
2249 AggState *node = castNode(AggState, pstate);
2250 TupleTableSlot *result = NULL;
2251
2253
2254 if (!node->agg_done)
2255 {
2256 /* Dispatch based on strategy */
2257 switch (node->phase->aggstrategy)
2258 {
2259 case AGG_HASHED:
2260 if (!node->table_filled)
2261 agg_fill_hash_table(node);
2263 case AGG_MIXED:
2264 result = agg_retrieve_hash_table(node);
2265 break;
2266 case AGG_PLAIN:
2267 case AGG_SORTED:
2268 result = agg_retrieve_direct(node);
2269 break;
2270 }
2271
2272 if (!TupIsNull(result))
2273 return result;
2274 }
2275
2276 return NULL;
2277}
2278
2279/*
2280 * ExecAgg for non-hashed case
2281 */
2282static TupleTableSlot *
2284{
2285 Agg *node = aggstate->phase->aggnode;
2286 ExprContext *econtext;
2287 ExprContext *tmpcontext;
2288 AggStatePerAgg peragg;
2289 AggStatePerGroup *pergroups;
2292 TupleTableSlot *result;
2293 bool hasGroupingSets = aggstate->phase->numsets > 0;
2294 int numGroupingSets = Max(aggstate->phase->numsets, 1);
2295 int currentSet;
2296 int nextSetSize;
2297 int numReset;
2298 int i;
2299
2300 /*
2301 * get state info from node
2302 *
2303 * econtext is the per-output-tuple expression context
2304 *
2305 * tmpcontext is the per-input-tuple expression context
2306 */
2307 econtext = aggstate->ss.ps.ps_ExprContext;
2308 tmpcontext = aggstate->tmpcontext;
2309
2310 peragg = aggstate->peragg;
2311 pergroups = aggstate->pergroups;
2312 firstSlot = aggstate->ss.ss_ScanTupleSlot;
2313
2314 /*
2315 * We loop retrieving groups until we find one matching
2316 * aggstate->ss.ps.qual
2317 *
2318 * For grouping sets, we have the invariant that aggstate->projected_set
2319 * is either -1 (initial call) or the index (starting from 0) in
2320 * gset_lengths for the group we just completed (either by projecting a
2321 * row or by discarding it in the qual).
2322 */
2323 while (!aggstate->agg_done)
2324 {
2325 /*
2326 * Clear the per-output-tuple context for each group, as well as
2327 * aggcontext (which contains any pass-by-ref transvalues of the old
2328 * group). Some aggregate functions store working state in child
2329 * contexts; those now get reset automatically without us needing to
2330 * do anything special.
2331 *
2332 * We use ReScanExprContext not just ResetExprContext because we want
2333 * any registered shutdown callbacks to be called. That allows
2334 * aggregate functions to ensure they've cleaned up any non-memory
2335 * resources.
2336 */
2337 ReScanExprContext(econtext);
2338
2339 /*
2340 * Determine how many grouping sets need to be reset at this boundary.
2341 */
2342 if (aggstate->projected_set >= 0 &&
2343 aggstate->projected_set < numGroupingSets)
2344 numReset = aggstate->projected_set + 1;
2345 else
2347
2348 /*
2349 * numReset can change on a phase boundary, but that's OK; we want to
2350 * reset the contexts used in _this_ phase, and later, after possibly
2351 * changing phase, initialize the right number of aggregates for the
2352 * _new_ phase.
2353 */
2354
2355 for (i = 0; i < numReset; i++)
2356 {
2357 ReScanExprContext(aggstate->aggcontexts[i]);
2358 }
2359
2360 /*
2361 * Check if input is complete and there are no more groups to project
2362 * in this phase; move to next phase or mark as done.
2363 */
2364 if (aggstate->input_done == true &&
2365 aggstate->projected_set >= (numGroupingSets - 1))
2366 {
2367 if (aggstate->current_phase < aggstate->numphases - 1)
2368 {
2369 initialize_phase(aggstate, aggstate->current_phase + 1);
2370 aggstate->input_done = false;
2371 aggstate->projected_set = -1;
2372 numGroupingSets = Max(aggstate->phase->numsets, 1);
2373 node = aggstate->phase->aggnode;
2375 }
2376 else if (aggstate->aggstrategy == AGG_MIXED)
2377 {
2378 /*
2379 * Mixed mode; we've output all the grouped stuff and have
2380 * full hashtables, so switch to outputting those.
2381 */
2383 aggstate->table_filled = true;
2384 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2385 &aggstate->perhash[0].hashiter);
2386 select_current_set(aggstate, 0, true);
2388 }
2389 else
2390 {
2391 aggstate->agg_done = true;
2392 break;
2393 }
2394 }
2395
2396 /*
2397 * Get the number of columns in the next grouping set after the last
2398 * projected one (if any). This is the number of columns to compare to
2399 * see if we reached the boundary of that set too.
2400 */
2401 if (aggstate->projected_set >= 0 &&
2402 aggstate->projected_set < (numGroupingSets - 1))
2403 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2404 else
2405 nextSetSize = 0;
2406
2407 /*----------
2408 * If a subgroup for the current grouping set is present, project it.
2409 *
2410 * We have a new group if:
2411 * - we're out of input but haven't projected all grouping sets
2412 * (checked above)
2413 * OR
2414 * - we already projected a row that wasn't from the last grouping
2415 * set
2416 * AND
2417 * - the next grouping set has at least one grouping column (since
2418 * empty grouping sets project only once input is exhausted)
2419 * AND
2420 * - the previous and pending rows differ on the grouping columns
2421 * of the next grouping set
2422 *----------
2423 */
2424 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2425 if (aggstate->input_done ||
2426 (node->aggstrategy != AGG_PLAIN &&
2427 aggstate->projected_set != -1 &&
2428 aggstate->projected_set < (numGroupingSets - 1) &&
2429 nextSetSize > 0 &&
2430 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2431 tmpcontext)))
2432 {
2433 aggstate->projected_set += 1;
2434
2435 Assert(aggstate->projected_set < numGroupingSets);
2436 Assert(nextSetSize > 0 || aggstate->input_done);
2437 }
2438 else
2439 {
2440 /*
2441 * We no longer care what group we just projected, the next
2442 * projection will always be the first (or only) grouping set
2443 * (unless the input proves to be empty).
2444 */
2445 aggstate->projected_set = 0;
2446
2447 /*
2448 * If we don't already have the first tuple of the new group,
2449 * fetch it from the outer plan.
2450 */
2451 if (aggstate->grp_firstTuple == NULL)
2452 {
2454 if (!TupIsNull(outerslot))
2455 {
2456 /*
2457 * Make a copy of the first input tuple; we will use this
2458 * for comparisons (in group mode) and for projection.
2459 */
2460 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2461 }
2462 else
2463 {
2464 /* outer plan produced no tuples at all */
2465 if (hasGroupingSets)
2466 {
2467 /*
2468 * If there was no input at all, we need to project
2469 * rows only if there are grouping sets of size 0.
2470 * Note that this implies that there can't be any
2471 * references to ungrouped Vars, which would otherwise
2472 * cause issues with the empty output slot.
2473 *
2474 * XXX: This is no longer true, we currently deal with
2475 * this in finalize_aggregates().
2476 */
2477 aggstate->input_done = true;
2478
2479 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2480 {
2481 aggstate->projected_set += 1;
2482 if (aggstate->projected_set >= numGroupingSets)
2483 {
2484 /*
2485 * We can't set agg_done here because we might
2486 * have more phases to do, even though the
2487 * input is empty. So we need to restart the
2488 * whole outer loop.
2489 */
2490 break;
2491 }
2492 }
2493
2494 if (aggstate->projected_set >= numGroupingSets)
2495 continue;
2496 }
2497 else
2498 {
2499 aggstate->agg_done = true;
2500 /* If we are grouping, we should produce no tuples too */
2501 if (node->aggstrategy != AGG_PLAIN)
2502 return NULL;
2503 }
2504 }
2505 }
2506
2507 /*
2508 * Initialize working state for a new input tuple group.
2509 */
2511
2512 if (aggstate->grp_firstTuple != NULL)
2513 {
2514 /*
2515 * Store the copied first input tuple in the tuple table slot
2516 * reserved for it. The tuple will be deleted when it is
2517 * cleared from the slot.
2518 */
2519 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
2520 firstSlot, true);
2521 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2522
2523 /* set up for first advance_aggregates call */
2524 tmpcontext->ecxt_outertuple = firstSlot;
2525
2526 /*
2527 * Process each outer-plan tuple, and then fetch the next one,
2528 * until we exhaust the outer plan or cross a group boundary.
2529 */
2530 for (;;)
2531 {
2532 /*
2533 * During phase 1 only of a mixed agg, we need to update
2534 * hashtables as well in advance_aggregates.
2535 */
2536 if (aggstate->aggstrategy == AGG_MIXED &&
2537 aggstate->current_phase == 1)
2538 {
2540 }
2541
2542 /* Advance the aggregates (or combine functions) */
2544
2545 /* Reset per-input-tuple context after each tuple */
2546 ResetExprContext(tmpcontext);
2547
2549 if (TupIsNull(outerslot))
2550 {
2551 /* no more outer-plan tuples available */
2552
2553 /* if we built hash tables, finalize any spills */
2554 if (aggstate->aggstrategy == AGG_MIXED &&
2555 aggstate->current_phase == 1)
2557
2558 if (hasGroupingSets)
2559 {
2560 aggstate->input_done = true;
2561 break;
2562 }
2563 else
2564 {
2565 aggstate->agg_done = true;
2566 break;
2567 }
2568 }
2569 /* set up for next advance_aggregates call */
2570 tmpcontext->ecxt_outertuple = outerslot;
2571
2572 /*
2573 * If we are grouping, check whether we've crossed a group
2574 * boundary.
2575 */
2576 if (node->aggstrategy != AGG_PLAIN && node->numCols > 0)
2577 {
2578 tmpcontext->ecxt_innertuple = firstSlot;
2579 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2580 tmpcontext))
2581 {
2582 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2583 break;
2584 }
2585 }
2586 }
2587 }
2588
2589 /*
2590 * Use the representative input tuple for any references to
2591 * non-aggregated input columns in aggregate direct args, the node
2592 * qual, and the tlist. (If we are not grouping, and there are no
2593 * input rows at all, we will come here with an empty firstSlot
2594 * ... but if not grouping, there can't be any references to
2595 * non-aggregated input columns, so no problem.)
2596 */
2597 econtext->ecxt_outertuple = firstSlot;
2598 }
2599
2600 Assert(aggstate->projected_set >= 0);
2601
2602 currentSet = aggstate->projected_set;
2603
2605
2607
2609 peragg,
2610 pergroups[currentSet]);
2611
2612 /*
2613 * If there's no row to project right now, we must continue rather
2614 * than returning a null since there might be more groups.
2615 */
2616 result = project_aggregates(aggstate);
2617 if (result)
2618 return result;
2619 }
2620
2621 /* No more groups */
2622 return NULL;
2623}
2624
2625/*
2626 * ExecAgg for hashed case: read input and build hash table
2627 */
2628static void
2630{
2632 ExprContext *tmpcontext = aggstate->tmpcontext;
2633
2634 /*
2635 * Process each outer-plan tuple, and then fetch the next one, until we
2636 * exhaust the outer plan.
2637 */
2638 for (;;)
2639 {
2641 if (TupIsNull(outerslot))
2642 break;
2643
2644 /* set up for lookup_hash_entries and advance_aggregates */
2645 tmpcontext->ecxt_outertuple = outerslot;
2646
2647 /* Find or build hashtable entries */
2649
2650 /* Advance the aggregates (or combine functions) */
2652
2653 /*
2654 * Reset per-input-tuple context after each tuple, but note that the
2655 * hash lookups do this too
2656 */
2657 ResetExprContext(aggstate->tmpcontext);
2658 }
2659
2660 /* finalize spills, if any */
2662
2663 aggstate->table_filled = true;
2664 /* Initialize to walk the first hash table */
2665 select_current_set(aggstate, 0, true);
2666 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2667 &aggstate->perhash[0].hashiter);
2668}
2669
2670/*
2671 * If any data was spilled during hash aggregation, reset the hash table and
2672 * reprocess one batch of spilled data. After reprocessing a batch, the hash
2673 * table will again contain data, ready to be consumed by
2674 * agg_retrieve_hash_table_in_memory().
2675 *
2676 * Should only be called after all in memory hash table entries have been
2677 * finalized and emitted.
2678 *
2679 * Return false when input is exhausted and there's no more work to be done;
2680 * otherwise return true.
2681 */
2682static bool
2684{
2686 AggStatePerHash perhash;
2688 LogicalTapeSet *tapeset = aggstate->hash_tapeset;
2689 bool spill_initialized = false;
2690
2691 if (aggstate->hash_batches == NIL)
2692 return false;
2693
2694 /* hash_batches is a stack, with the top item at the end of the list */
2695 batch = llast(aggstate->hash_batches);
2696 aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
2697
2698 hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2699 batch->used_bits, &aggstate->hash_mem_limit,
2700 &aggstate->hash_ngroups_limit, NULL);
2701
2702 /*
2703 * Each batch only processes one grouping set; set the rest to NULL so
2704 * that advance_aggregates() knows to ignore them. We don't touch
2705 * pergroups for sorted grouping sets here, because they will be needed if
2706 * we rescan later. The expressions for sorted grouping sets will not be
2707 * evaluated after we recompile anyway.
2708 */
2709 MemSet(aggstate->hash_pergroup, 0,
2710 sizeof(AggStatePerGroup) * aggstate->num_hashes);
2711
2712 /* free memory and reset hash tables */
2713 ReScanExprContext(aggstate->hashcontext);
2714 for (int setno = 0; setno < aggstate->num_hashes; setno++)
2715 ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2716
2717 aggstate->hash_ngroups_current = 0;
2718
2719 /*
2720 * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2721 * happens in phase 0. So, we switch to phase 1 when processing a batch,
2722 * and back to phase 0 after the batch is done.
2723 */
2724 Assert(aggstate->current_phase == 0);
2725 if (aggstate->phase->aggstrategy == AGG_MIXED)
2726 {
2727 aggstate->current_phase = 1;
2728 aggstate->phase = &aggstate->phases[aggstate->current_phase];
2729 }
2730
2731 select_current_set(aggstate, batch->setno, true);
2732
2733 perhash = &aggstate->perhash[aggstate->current_set];
2734
2735 /*
2736 * Spilled tuples are always read back as MinimalTuples, which may be
2737 * different from the outer plan, so recompile the aggregate expressions.
2738 *
2739 * We still need the NULL check, because we are only processing one
2740 * grouping set at a time and the rest will be NULL.
2741 */
2743
2744 INJECTION_POINT("hash-aggregate-process-batch", NULL);
2745 for (;;)
2746 {
2747 TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2748 TupleTableSlot *hashslot = perhash->hashslot;
2749 TupleHashTable hashtable = perhash->hashtable;
2750 TupleHashEntry entry;
2751 MinimalTuple tuple;
2752 uint32 hash;
2753 bool isnew = false;
2754 bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2755
2757
2758 tuple = hashagg_batch_read(batch, &hash);
2759 if (tuple == NULL)
2760 break;
2761
2762 ExecStoreMinimalTuple(tuple, spillslot, true);
2763 aggstate->tmpcontext->ecxt_outertuple = spillslot;
2764
2765 prepare_hash_slot(perhash,
2766 aggstate->tmpcontext->ecxt_outertuple,
2767 hashslot);
2768 entry = LookupTupleHashEntryHash(hashtable, hashslot,
2769 p_isnew, hash);
2770
2771 if (entry != NULL)
2772 {
2773 if (isnew)
2774 initialize_hash_entry(aggstate, hashtable, entry);
2775 aggstate->hash_pergroup[batch->setno] = TupleHashEntryGetAdditional(hashtable, entry);
2777 }
2778 else
2779 {
2780 if (!spill_initialized)
2781 {
2782 /*
2783 * Avoid initializing the spill until we actually need it so
2784 * that we don't assign tapes that will never be used.
2785 */
2786 spill_initialized = true;
2787 hashagg_spill_init(&spill, tapeset, batch->used_bits,
2788 batch->input_card, aggstate->hashentrysize);
2789 }
2790 /* no memory for a new group, spill */
2792
2793 aggstate->hash_pergroup[batch->setno] = NULL;
2794 }
2795
2796 /*
2797 * Reset per-input-tuple context after each tuple, but note that the
2798 * hash lookups do this too
2799 */
2800 ResetExprContext(aggstate->tmpcontext);
2801 }
2802
2803 LogicalTapeClose(batch->input_tape);
2804
2805 /* change back to phase 0 */
2806 aggstate->current_phase = 0;
2807 aggstate->phase = &aggstate->phases[aggstate->current_phase];
2808
2810 {
2812 hash_agg_update_metrics(aggstate, true, spill.npartitions);
2813 }
2814 else
2816
2817 aggstate->hash_spill_mode = false;
2818
2819 /* prepare to walk the first hash table */
2820 select_current_set(aggstate, batch->setno, true);
2821 ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2822 &aggstate->perhash[batch->setno].hashiter);
2823
2824 pfree(batch);
2825
2826 return true;
2827}
2828
2829/*
2830 * ExecAgg for hashed case: retrieving groups from hash table
2831 *
2832 * After exhausting in-memory tuples, also try refilling the hash table using
2833 * previously-spilled tuples. Only returns NULL after all in-memory and
2834 * spilled tuples are exhausted.
2835 */
2836static TupleTableSlot *
2838{
2839 TupleTableSlot *result = NULL;
2840
2841 while (result == NULL)
2842 {
2844 if (result == NULL)
2845 {
2847 {
2848 aggstate->agg_done = true;
2849 break;
2850 }
2851 }
2852 }
2853
2854 return result;
2855}
2856
2857/*
2858 * Retrieve the groups from the in-memory hash tables without considering any
2859 * spilled tuples.
2860 */
2861static TupleTableSlot *
2863{
2864 ExprContext *econtext;
2865 AggStatePerAgg peragg;
2867 TupleHashEntry entry;
2869 TupleTableSlot *result;
2870 AggStatePerHash perhash;
2871
2872 /*
2873 * get state info from node.
2874 *
2875 * econtext is the per-output-tuple expression context.
2876 */
2877 econtext = aggstate->ss.ps.ps_ExprContext;
2878 peragg = aggstate->peragg;
2879 firstSlot = aggstate->ss.ss_ScanTupleSlot;
2880
2881 /*
2882 * Note that perhash (and therefore anything accessed through it) can
2883 * change inside the loop, as we change between grouping sets.
2884 */
2885 perhash = &aggstate->perhash[aggstate->current_set];
2886
2887 /*
2888 * We loop retrieving groups until we find one satisfying
2889 * aggstate->ss.ps.qual
2890 */
2891 for (;;)
2892 {
2893 TupleTableSlot *hashslot = perhash->hashslot;
2894 TupleHashTable hashtable = perhash->hashtable;
2895 int i;
2896
2898
2899 /*
2900 * Find the next entry in the hash table
2901 */
2902 entry = ScanTupleHashTable(hashtable, &perhash->hashiter);
2903 if (entry == NULL)
2904 {
2905 int nextset = aggstate->current_set + 1;
2906
2907 if (nextset < aggstate->num_hashes)
2908 {
2909 /*
2910 * Switch to next grouping set, reinitialize, and restart the
2911 * loop.
2912 */
2914
2915 perhash = &aggstate->perhash[aggstate->current_set];
2916
2917 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2918
2919 continue;
2920 }
2921 else
2922 {
2923 return NULL;
2924 }
2925 }
2926
2927 /*
2928 * Clear the per-output-tuple context for each group
2929 *
2930 * We intentionally don't use ReScanExprContext here; if any aggs have
2931 * registered shutdown callbacks, they mustn't be called yet, since we
2932 * might not be done with that agg.
2933 */
2934 ResetExprContext(econtext);
2935
2936 /*
2937 * Transform representative tuple back into one with the right
2938 * columns.
2939 */
2940 ExecStoreMinimalTuple(TupleHashEntryGetTuple(entry), hashslot, false);
2941 slot_getallattrs(hashslot);
2942
2944 memset(firstSlot->tts_isnull, true,
2945 firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2946
2947 for (i = 0; i < perhash->numhashGrpCols; i++)
2948 {
2949 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2950
2951 firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2952 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2953 }
2955
2957
2958 /*
2959 * Use the representative input tuple for any references to
2960 * non-aggregated input columns in the qual and tlist.
2961 */
2962 econtext->ecxt_outertuple = firstSlot;
2963
2965 econtext->ecxt_outertuple,
2966 aggstate->current_set);
2967
2969
2970 result = project_aggregates(aggstate);
2971 if (result)
2972 return result;
2973 }
2974
2975 /* No more groups */
2976 return NULL;
2977}
2978
2979/*
2980 * hashagg_spill_init
2981 *
2982 * Called after we determined that spilling is necessary. Chooses the number
2983 * of partitions to create, and initializes them.
2984 */
2985static void
2987 double input_groups, double hashentrysize)
2988{
2989 int npartitions;
2990 int partition_bits;
2991
2992 npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2993 used_bits, &partition_bits);
2994
2995#ifdef USE_INJECTION_POINTS
2996 if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
2997 {
2998 npartitions = 1;
2999 partition_bits = 0;
3000 INJECTION_POINT_CACHED("hash-aggregate-single-partition", NULL);
3001 }
3002#endif
3003
3004 spill->partitions = palloc0_array(LogicalTape *, npartitions);
3005 spill->ntuples = palloc0_array(int64, npartitions);
3006 spill->hll_card = palloc0_array(hyperLogLogState, npartitions);
3007
3008 for (int i = 0; i < npartitions; i++)
3009 spill->partitions[i] = LogicalTapeCreate(tapeset);
3010
3011 spill->shift = 32 - used_bits - partition_bits;
3012 if (spill->shift < 32)
3013 spill->mask = (npartitions - 1) << spill->shift;
3014 else
3015 spill->mask = 0;
3016 spill->npartitions = npartitions;
3017
3018 for (int i = 0; i < npartitions; i++)
3020}
3021
3022/*
3023 * hashagg_spill_tuple
3024 *
3025 * No room for new groups in the hash table. Save for later in the appropriate
3026 * partition.
3027 */
3028static Size
3030 TupleTableSlot *inputslot, uint32 hash)
3031{
3033 int partition;
3034 MinimalTuple tuple;
3036 int total_written = 0;
3037 bool shouldFree;
3038
3039 Assert(spill->partitions != NULL);
3040
3041 /* spill only attributes that we actually need */
3042 if (!aggstate->all_cols_needed)
3043 {
3044 spillslot = aggstate->hash_spill_wslot;
3045 slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
3047 for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
3048 {
3049 if (bms_is_member(i + 1, aggstate->colnos_needed))
3050 {
3051 spillslot->tts_values[i] = inputslot->tts_values[i];
3052 spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
3053 }
3054 else
3055 spillslot->tts_isnull[i] = true;
3056 }
3058 }
3059 else
3060 spillslot = inputslot;
3061
3063
3064 if (spill->shift < 32)
3065 partition = (hash & spill->mask) >> spill->shift;
3066 else
3067 partition = 0;
3068
3069 spill->ntuples[partition]++;
3070
3071 /*
3072 * All hash values destined for a given partition have some bits in
3073 * common, which causes bad HLL cardinality estimates. Hash the hash to
3074 * get a more uniform distribution.
3075 */
3077
3078 tape = spill->partitions[partition];
3079
3080 LogicalTapeWrite(tape, &hash, sizeof(uint32));
3081 total_written += sizeof(uint32);
3082
3083 LogicalTapeWrite(tape, tuple, tuple->t_len);
3084 total_written += tuple->t_len;
3085
3086 if (shouldFree)
3087 pfree(tuple);
3088
3089 return total_written;
3090}
3091
3092/*
3093 * hashagg_batch_new
3094 *
3095 * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3096 * be done.
3097 */
3098static HashAggBatch *
3099hashagg_batch_new(LogicalTape *input_tape, int setno,
3100 int64 input_tuples, double input_card, int used_bits)
3101{
3103
3104 batch->setno = setno;
3105 batch->used_bits = used_bits;
3106 batch->input_tape = input_tape;
3107 batch->input_tuples = input_tuples;
3108 batch->input_card = input_card;
3109
3110 return batch;
3111}
3112
3113/*
3114 * hashagg_batch_read
3115 * read the next tuple from a batch's tape. Return NULL if no more.
3116 */
3117static MinimalTuple
3119{
3120 LogicalTape *tape = batch->input_tape;
3121 MinimalTuple tuple;
3122 uint32 t_len;
3123 size_t nread;
3124 uint32 hash;
3125
3126 nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
3127 if (nread == 0)
3128 return NULL;
3129 if (nread != sizeof(uint32))
3130 ereport(ERROR,
3132 errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3133 tape, sizeof(uint32), nread)));
3134 if (hashp != NULL)
3135 *hashp = hash;
3136
3137 nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
3138 if (nread != sizeof(uint32))
3139 ereport(ERROR,
3141 errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3142 tape, sizeof(uint32), nread)));
3143
3144 tuple = (MinimalTuple) palloc(t_len);
3145 tuple->t_len = t_len;
3146
3148 (char *) tuple + sizeof(uint32),
3149 t_len - sizeof(uint32));
3150 if (nread != t_len - sizeof(uint32))
3151 ereport(ERROR,
3153 errmsg_internal("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3154 tape, t_len - sizeof(uint32), nread)));
3155
3156 return tuple;
3157}
3158
3159/*
3160 * hashagg_finish_initial_spills
3161 *
3162 * After a HashAggBatch has been processed, it may have spilled tuples to
3163 * disk. If so, turn the spilled partitions into new batches that must later
3164 * be executed.
3165 */
3166static void
3168{
3169 int setno;
3170 int total_npartitions = 0;
3171
3172 if (aggstate->hash_spills != NULL)
3173 {
3174 for (setno = 0; setno < aggstate->num_hashes; setno++)
3175 {
3176 HashAggSpill *spill = &aggstate->hash_spills[setno];
3177
3180 }
3181
3182 /*
3183 * We're not processing tuples from outer plan any more; only
3184 * processing batches of spilled tuples. The initial spill structures
3185 * are no longer needed.
3186 */
3187 pfree(aggstate->hash_spills);
3188 aggstate->hash_spills = NULL;
3189 }
3190
3192 aggstate->hash_spill_mode = false;
3193}
3194
3195/*
3196 * hashagg_spill_finish
3197 *
3198 * Transform spill partitions into new batches.
3199 */
3200static void
3202{
3203 int i;
3204 int used_bits = 32 - spill->shift;
3205
3206 if (spill->npartitions == 0)
3207 return; /* didn't spill */
3208
3209 for (i = 0; i < spill->npartitions; i++)
3210 {
3211 LogicalTape *tape = spill->partitions[i];
3213 double cardinality;
3214
3215 /* if the partition is empty, don't create a new batch of work */
3216 if (spill->ntuples[i] == 0)
3217 continue;
3218
3219 cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3220 freeHyperLogLog(&spill->hll_card[i]);
3221
3222 /* rewinding frees the buffer while not in use */
3224
3226 spill->ntuples[i], cardinality,
3227 used_bits);
3228 aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
3229 aggstate->hash_batches_used++;
3230 }
3231
3232 pfree(spill->ntuples);
3233 pfree(spill->hll_card);
3234 pfree(spill->partitions);
3235}
3236
3237/*
3238 * Free resources related to a spilled HashAgg.
3239 */
3240static void
3242{
3243 /* free spills from initial pass */
3244 if (aggstate->hash_spills != NULL)
3245 {
3246 int setno;
3247
3248 for (setno = 0; setno < aggstate->num_hashes; setno++)
3249 {
3250 HashAggSpill *spill = &aggstate->hash_spills[setno];
3251
3252 pfree(spill->ntuples);
3253 pfree(spill->partitions);
3254 }
3255 pfree(aggstate->hash_spills);
3256 aggstate->hash_spills = NULL;
3257 }
3258
3259 /* free batches */
3260 list_free_deep(aggstate->hash_batches);
3261 aggstate->hash_batches = NIL;
3262
3263 /* close tape set */
3264 if (aggstate->hash_tapeset != NULL)
3265 {
3266 LogicalTapeSetClose(aggstate->hash_tapeset);
3267 aggstate->hash_tapeset = NULL;
3268 }
3269}
3270
3271
3272/* -----------------
3273 * ExecInitAgg
3274 *
3275 * Creates the run-time information for the agg node produced by the
3276 * planner and initializes its outer subtree.
3277 *
3278 * -----------------
3279 */
3280AggState *
3281ExecInitAgg(Agg *node, EState *estate, int eflags)
3282{
3286 AggStatePerGroup *pergroups;
3287 Plan *outerPlan;
3288 ExprContext *econtext;
3290 int max_aggno;
3291 int max_transno;
3292 int numaggrefs;
3293 int numaggs;
3294 int numtrans;
3295 int phase;
3296 int phaseidx;
3297 ListCell *l;
3298 Bitmapset *all_grouped_cols = NULL;
3299 int numGroupingSets = 1;
3300 int numPhases;
3301 int numHashes;
3302 int i = 0;
3303 int j = 0;
3304 bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3305 node->aggstrategy == AGG_MIXED);
3306
3307 /* check for unsupported flags */
3308 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3309
3310 /*
3311 * create state structure
3312 */
3314 aggstate->ss.ps.plan = (Plan *) node;
3315 aggstate->ss.ps.state = estate;
3316 aggstate->ss.ps.ExecProcNode = ExecAgg;
3317
3318 aggstate->aggs = NIL;
3319 aggstate->numaggs = 0;
3320 aggstate->numtrans = 0;
3321 aggstate->aggstrategy = node->aggstrategy;
3322 aggstate->aggsplit = node->aggsplit;
3323 aggstate->maxsets = 0;
3324 aggstate->projected_set = -1;
3325 aggstate->current_set = 0;
3326 aggstate->peragg = NULL;
3327 aggstate->pertrans = NULL;
3328 aggstate->curperagg = NULL;
3329 aggstate->curpertrans = NULL;
3330 aggstate->input_done = false;
3331 aggstate->agg_done = false;
3332 aggstate->pergroups = NULL;
3333 aggstate->grp_firstTuple = NULL;
3334 aggstate->sort_in = NULL;
3335 aggstate->sort_out = NULL;
3336
3337 /*
3338 * phases[0] always exists, but is dummy in sorted/plain mode
3339 */
3340 numPhases = (use_hashing ? 1 : 2);
3341 numHashes = (use_hashing ? 1 : 0);
3342
3343 /*
3344 * Calculate the maximum number of grouping sets in any phase; this
3345 * determines the size of some allocations. Also calculate the number of
3346 * phases, since all hashed/mixed nodes contribute to only a single phase.
3347 */
3348 if (node->groupingSets)
3349 {
3351
3352 foreach(l, node->chain)
3353 {
3354 Agg *agg = lfirst(l);
3355
3357 list_length(agg->groupingSets));
3358
3359 /*
3360 * additional AGG_HASHED aggs become part of phase 0, but all
3361 * others add an extra phase.
3362 */
3363 if (agg->aggstrategy != AGG_HASHED)
3364 ++numPhases;
3365 else
3366 ++numHashes;
3367 }
3368 }
3369
3370 aggstate->maxsets = numGroupingSets;
3371 aggstate->numphases = numPhases;
3372
3374
3375 /*
3376 * Create expression contexts. We need three or more, one for
3377 * per-input-tuple processing, one for per-output-tuple processing, one
3378 * for all the hashtables, and one for each grouping set. The per-tuple
3379 * memory context of the per-grouping-set ExprContexts (aggcontexts)
3380 * replaces the standalone memory context formerly used to hold transition
3381 * values. We cheat a little by using ExecAssignExprContext() to build
3382 * all of them.
3383 *
3384 * NOTE: the details of what is stored in aggcontexts and what is stored
3385 * in the regular per-query memory context are driven by a simple
3386 * decision: we want to reset the aggcontext at group boundaries (if not
3387 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3388 */
3389 ExecAssignExprContext(estate, &aggstate->ss.ps);
3390 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3391
3392 for (i = 0; i < numGroupingSets; ++i)
3393 {
3394 ExecAssignExprContext(estate, &aggstate->ss.ps);
3395 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3396 }
3397
3398 if (use_hashing)
3400
3401 ExecAssignExprContext(estate, &aggstate->ss.ps);
3402
3403 /*
3404 * Initialize child nodes.
3405 *
3406 * If we are doing a hashed aggregation then the child plan does not need
3407 * to handle REWIND efficiently; see ExecReScanAgg.
3408 */
3409 if (node->aggstrategy == AGG_HASHED)
3410 eflags &= ~EXEC_FLAG_REWIND;
3411 outerPlan = outerPlan(node);
3412 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3413
3414 /*
3415 * initialize source tuple type.
3416 */
3417 aggstate->ss.ps.outerops =
3419 &aggstate->ss.ps.outeropsfixed);
3420 aggstate->ss.ps.outeropsset = true;
3421
3423 aggstate->ss.ps.outerops);
3424 scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3425
3426 /*
3427 * If there are more than two phases (including a potential dummy phase
3428 * 0), input will be resorted using tuplesort. Need a slot for that.
3429 */
3430 if (numPhases > 2)
3431 {
3432 aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3434
3435 /*
3436 * The output of the tuplesort, and the output from the outer child
3437 * might not use the same type of slot. In most cases the child will
3438 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3439 * input can also be presorted due an index, in which case it could be
3440 * a different type of slot.
3441 *
3442 * XXX: For efficiency it would be good to instead/additionally
3443 * generate expressions with corresponding settings of outerops* for
3444 * the individual phases - deforming is often a bottleneck for
3445 * aggregations with lots of rows per group. If there's multiple
3446 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3447 * the nodeAgg.c internal tuplesort).
3448 */
3449 if (aggstate->ss.ps.outeropsfixed &&
3450 aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3451 aggstate->ss.ps.outeropsfixed = false;
3452 }
3453
3454 /*
3455 * Initialize result type, slot and projection.
3456 */
3459
3460 /*
3461 * initialize child expressions
3462 *
3463 * We expect the parser to have checked that no aggs contain other agg
3464 * calls in their arguments (and just to be sure, we verify it again while
3465 * initializing the plan node). This would make no sense under SQL
3466 * semantics, and it's forbidden by the spec. Because it is true, we
3467 * don't need to worry about evaluating the aggs in any particular order.
3468 *
3469 * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
3470 * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
3471 * during ExecAssignProjectionInfo, above.
3472 */
3473 aggstate->ss.ps.qual =
3475
3476 /*
3477 * We should now have found all Aggrefs in the targetlist and quals.
3478 */
3480 max_aggno = -1;
3481 max_transno = -1;
3482 foreach(l, aggstate->aggs)
3483 {
3484 Aggref *aggref = (Aggref *) lfirst(l);
3485
3486 max_aggno = Max(max_aggno, aggref->aggno);
3487 max_transno = Max(max_transno, aggref->aggtransno);
3488 }
3489 aggstate->numaggs = numaggs = max_aggno + 1;
3490 aggstate->numtrans = numtrans = max_transno + 1;
3491
3492 /*
3493 * For each phase, prepare grouping set data and fmgr lookup data for
3494 * compare functions. Accumulate all_grouped_cols in passing.
3495 */
3497
3498 aggstate->num_hashes = numHashes;
3499 if (numHashes)
3500 {
3502 aggstate->phases[0].numsets = 0;
3503 aggstate->phases[0].gset_lengths = palloc_array(int, numHashes);
3504 aggstate->phases[0].grouped_cols = palloc_array(Bitmapset *, numHashes);
3505 }
3506
3507 phase = 0;
3508 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3509 {
3510 Agg *aggnode;
3511 Sort *sortnode;
3512
3513 if (phaseidx > 0)
3514 {
3515 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3516 sortnode = castNode(Sort, outerPlan(aggnode));
3517 }
3518 else
3519 {
3520 aggnode = node;
3521 sortnode = NULL;
3522 }
3523
3524 Assert(phase <= 1 || sortnode);
3525
3526 if (aggnode->aggstrategy == AGG_HASHED
3527 || aggnode->aggstrategy == AGG_MIXED)
3528 {
3529 AggStatePerPhase phasedata = &aggstate->phases[0];
3530 AggStatePerHash perhash;
3531 Bitmapset *cols = NULL;
3532
3533 Assert(phase == 0);
3534 i = phasedata->numsets++;
3535 perhash = &aggstate->perhash[i];
3536
3537 /* phase 0 always points to the "real" Agg in the hash case */
3538 phasedata->aggnode = node;
3540
3541 /* but the actual Agg node representing this hash is saved here */
3542 perhash->aggnode = aggnode;
3543
3544 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3545
3546 for (j = 0; j < aggnode->numCols; ++j)
3547 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3548
3549 phasedata->grouped_cols[i] = cols;
3550
3551 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3552 continue;
3553 }
3554 else
3555 {
3556 AggStatePerPhase phasedata = &aggstate->phases[++phase];
3557 int num_sets;
3558
3560
3561 if (num_sets)
3562 {
3563 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3564 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3565
3566 i = 0;
3567 foreach(l, aggnode->groupingSets)
3568 {
3570 Bitmapset *cols = NULL;
3571
3572 /* planner forces this to be correct */
3573 for (j = 0; j < current_length; ++j)
3574 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3575
3576 phasedata->grouped_cols[i] = cols;
3577 phasedata->gset_lengths[i] = current_length;
3578
3579 ++i;
3580 }
3581
3582 all_grouped_cols = bms_add_members(all_grouped_cols,
3583 phasedata->grouped_cols[0]);
3584 }
3585 else
3586 {
3587 Assert(phaseidx == 0);
3588
3589 phasedata->gset_lengths = NULL;
3590 phasedata->grouped_cols = NULL;
3591 }
3592
3593 /*
3594 * If we are grouping, precompute fmgr lookup data for inner loop.
3595 */
3596 if (aggnode->aggstrategy == AGG_SORTED)
3597 {
3598 /*
3599 * Build a separate function for each subset of columns that
3600 * need to be compared.
3601 */
3602 phasedata->eqfunctions = palloc0_array(ExprState *, aggnode->numCols);
3603
3604 /* for each grouping set */
3605 for (int k = 0; k < phasedata->numsets; k++)
3606 {
3607 int length = phasedata->gset_lengths[k];
3608
3609 /* nothing to do for empty grouping set */
3610 if (length == 0)
3611 continue;
3612
3613 /* if we already had one of this length, it'll do */
3614 if (phasedata->eqfunctions[length - 1] != NULL)
3615 continue;
3616
3617 phasedata->eqfunctions[length - 1] =
3619 length,
3620 aggnode->grpColIdx,
3621 aggnode->grpOperators,
3622 aggnode->grpCollations,
3623 (PlanState *) aggstate);
3624 }
3625
3626 /* and for all grouped columns, unless already computed */
3627 if (aggnode->numCols > 0 &&
3628 phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3629 {
3630 phasedata->eqfunctions[aggnode->numCols - 1] =
3632 aggnode->numCols,
3633 aggnode->grpColIdx,
3634 aggnode->grpOperators,
3635 aggnode->grpCollations,
3636 (PlanState *) aggstate);
3637 }
3638 }
3639
3640 phasedata->aggnode = aggnode;
3641 phasedata->aggstrategy = aggnode->aggstrategy;
3642 phasedata->sortnode = sortnode;
3643 }
3644 }
3645
3646 /*
3647 * Convert all_grouped_cols to a descending-order list.
3648 */
3649 i = -1;
3650 while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3651 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3652
3653 /*
3654 * Set up aggregate-result storage in the output expr context, and also
3655 * allocate my private per-agg working storage
3656 */
3657 econtext = aggstate->ss.ps.ps_ExprContext;
3658 econtext->ecxt_aggvalues = palloc0_array(Datum, numaggs);
3659 econtext->ecxt_aggnulls = palloc0_array(bool, numaggs);
3660
3663
3664 aggstate->peragg = peraggs;
3665 aggstate->pertrans = pertransstates;
3666
3667
3669 pergroups = aggstate->all_pergroups;
3670
3671 if (node->aggstrategy != AGG_HASHED)
3672 {
3673 for (i = 0; i < numGroupingSets; i++)
3674 {
3675 pergroups[i] = palloc0_array(AggStatePerGroupData, numaggs);
3676 }
3677
3678 aggstate->pergroups = pergroups;
3679 pergroups += numGroupingSets;
3680 }
3681
3682 /*
3683 * Hashing can only appear in the initial phase.
3684 */
3685 if (use_hashing)
3686 {
3687 Plan *outerplan = outerPlan(node);
3688 double totalGroups = 0;
3689
3690 aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3692 aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3693 &TTSOpsVirtual);
3694
3695 /* this is an array of pointers, not structures */
3696 aggstate->hash_pergroup = pergroups;
3697
3698 aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3699 outerplan->plan_width,
3700 node->transitionSpace);
3701
3702 /*
3703 * Consider all of the grouping sets together when setting the limits
3704 * and estimating the number of partitions. This can be inaccurate
3705 * when there is more than one grouping set, but should still be
3706 * reasonable.
3707 */
3708 for (int k = 0; k < aggstate->num_hashes; k++)
3709 totalGroups += aggstate->perhash[k].aggnode->numGroups;
3710
3711 hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3712 &aggstate->hash_mem_limit,
3713 &aggstate->hash_ngroups_limit,
3714 &aggstate->hash_planned_partitions);
3716
3717 /* Skip massive memory allocation if we are just doing EXPLAIN */
3718 if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3720
3721 aggstate->table_filled = false;
3722
3723 /* Initialize this to 1, meaning nothing spilled, yet */
3724 aggstate->hash_batches_used = 1;
3725 }
3726
3727 /*
3728 * Initialize current phase-dependent values to initial phase. The initial
3729 * phase is 1 (first sort pass) for all strategies that use sorting (if
3730 * hashing is being done too, then phase 0 is processed last); but if only
3731 * hashing is being done, then phase 0 is all there is.
3732 */
3733 if (node->aggstrategy == AGG_HASHED)
3734 {
3735 aggstate->current_phase = 0;
3737 select_current_set(aggstate, 0, true);
3738 }
3739 else
3740 {
3741 aggstate->current_phase = 1;
3743 select_current_set(aggstate, 0, false);
3744 }
3745
3746 /*
3747 * Perform lookups of aggregate function info, and initialize the
3748 * unchanging fields of the per-agg and per-trans data.
3749 */
3750 foreach(l, aggstate->aggs)
3751 {
3752 Aggref *aggref = lfirst(l);
3753 AggStatePerAgg peragg;
3754 AggStatePerTrans pertrans;
3757 int numDirectArgs;
3761 Oid finalfn_oid;
3762 Oid serialfn_oid,
3763 deserialfn_oid;
3764 Oid aggOwner;
3766 Oid aggtranstype;
3767
3768 /* Planner should have assigned aggregate to correct level */
3769 Assert(aggref->agglevelsup == 0);
3770 /* ... and the split mode should match */
3771 Assert(aggref->aggsplit == aggstate->aggsplit);
3772
3773 peragg = &peraggs[aggref->aggno];
3774
3775 /* Check if we initialized the state for this aggregate already. */
3776 if (peragg->aggref != NULL)
3777 continue;
3778
3779 peragg->aggref = aggref;
3780 peragg->transno = aggref->aggtransno;
3781
3782 /* Fetch the pg_aggregate row */
3784 ObjectIdGetDatum(aggref->aggfnoid));
3786 elog(ERROR, "cache lookup failed for aggregate %u",
3787 aggref->aggfnoid);
3789
3790 /* Check permission to call aggregate function */
3792 ACL_EXECUTE);
3793 if (aclresult != ACLCHECK_OK)
3795 get_func_name(aggref->aggfnoid));
3797
3798 /* planner recorded transition state type in the Aggref itself */
3799 aggtranstype = aggref->aggtranstype;
3800 Assert(OidIsValid(aggtranstype));
3801
3802 /* Final function only required if we're finalizing the aggregates */
3803 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3804 peragg->finalfn_oid = finalfn_oid = InvalidOid;
3805 else
3806 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3807
3808 serialfn_oid = InvalidOid;
3809 deserialfn_oid = InvalidOid;
3810
3811 /*
3812 * Check if serialization/deserialization is required. We only do it
3813 * for aggregates that have transtype INTERNAL.
3814 */
3815 if (aggtranstype == INTERNALOID)
3816 {
3817 /*
3818 * The planner should only have generated a serialize agg node if
3819 * every aggregate with an INTERNAL state has a serialization
3820 * function. Verify that.
3821 */
3822 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3823 {
3824 /* serialization only valid when not running finalfn */
3826
3827 if (!OidIsValid(aggform->aggserialfn))
3828 elog(ERROR, "serialfunc not provided for serialization aggregation");
3829 serialfn_oid = aggform->aggserialfn;
3830 }
3831
3832 /* Likewise for deserialization functions */
3833 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3834 {
3835 /* deserialization only valid when combining states */
3837
3838 if (!OidIsValid(aggform->aggdeserialfn))
3839 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3840 deserialfn_oid = aggform->aggdeserialfn;
3841 }
3842 }
3843
3844 /* Check that aggregate owner has permission to call component fns */
3845 {
3847
3849 ObjectIdGetDatum(aggref->aggfnoid));
3851 elog(ERROR, "cache lookup failed for function %u",
3852 aggref->aggfnoid);
3855
3856 if (OidIsValid(finalfn_oid))
3857 {
3859 ACL_EXECUTE);
3860 if (aclresult != ACLCHECK_OK)
3862 get_func_name(finalfn_oid));
3863 InvokeFunctionExecuteHook(finalfn_oid);
3864 }
3865 if (OidIsValid(serialfn_oid))
3866 {
3868 ACL_EXECUTE);
3869 if (aclresult != ACLCHECK_OK)
3871 get_func_name(serialfn_oid));
3872 InvokeFunctionExecuteHook(serialfn_oid);
3873 }
3874 if (OidIsValid(deserialfn_oid))
3875 {
3877 ACL_EXECUTE);
3878 if (aclresult != ACLCHECK_OK)
3880 get_func_name(deserialfn_oid));
3881 InvokeFunctionExecuteHook(deserialfn_oid);
3882 }
3883 }
3884
3885 /*
3886 * Get actual datatypes of the (nominal) aggregate inputs. These
3887 * could be different from the agg's declared input types, when the
3888 * agg accepts ANY or a polymorphic type.
3889 */
3892
3893 /* Count the "direct" arguments, if any */
3895
3896 /* Detect how many arguments to pass to the finalfn */
3897 if (aggform->aggfinalextra)
3898 peragg->numFinalArgs = numAggTransFnArgs + 1;
3899 else
3900 peragg->numFinalArgs = numDirectArgs + 1;
3901
3902 /* Initialize any direct-argument expressions */
3904 (PlanState *) aggstate);
3905
3906 /*
3907 * build expression trees using actual argument & result types for the
3908 * finalfn, if it exists and is required.
3909 */
3910 if (OidIsValid(finalfn_oid))
3911 {
3913 peragg->numFinalArgs,
3914 aggtranstype,
3915 aggref->aggtype,
3916 aggref->inputcollid,
3917 finalfn_oid,
3918 &finalfnexpr);
3919 fmgr_info(finalfn_oid, &peragg->finalfn);
3921 }
3922
3923 /* get info about the output value's datatype */
3924 get_typlenbyval(aggref->aggtype,
3925 &peragg->resulttypeLen,
3926 &peragg->resulttypeByVal);
3927
3928 /*
3929 * Build working state for invoking the transition function, if we
3930 * haven't done it already.
3931 */
3932 pertrans = &pertransstates[aggref->aggtransno];
3933 if (pertrans->aggref == NULL)
3934 {
3937 bool initValueIsNull;
3938 Oid transfn_oid;
3939
3940 /*
3941 * If this aggregation is performing state combines, then instead
3942 * of using the transition function, we'll use the combine
3943 * function.
3944 */
3945 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3946 {
3947 transfn_oid = aggform->aggcombinefn;
3948
3949 /* If not set then the planner messed up */
3950 if (!OidIsValid(transfn_oid))
3951 elog(ERROR, "combinefn not set for aggregate function");
3952 }
3953 else
3954 transfn_oid = aggform->aggtransfn;
3955
3957 if (aclresult != ACLCHECK_OK)
3959 get_func_name(transfn_oid));
3960 InvokeFunctionExecuteHook(transfn_oid);
3961
3962 /*
3963 * initval is potentially null, so don't try to access it as a
3964 * struct field. Must do it the hard way with SysCacheGetAttr.
3965 */
3968 &initValueIsNull);
3969 if (initValueIsNull)
3970 initValue = (Datum) 0;
3971 else
3972 initValue = GetAggInitVal(textInitVal, aggtranstype);
3973
3974 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3975 {
3976 Oid combineFnInputTypes[] = {aggtranstype,
3977 aggtranstype};
3978
3979 /*
3980 * When combining there's only one input, the to-be-combined
3981 * transition value. The transition value is not counted
3982 * here.
3983 */
3984 pertrans->numTransInputs = 1;
3985
3986 /* aggcombinefn always has two arguments of aggtranstype */
3987 build_pertrans_for_aggref(pertrans, aggstate, estate,
3988 aggref, transfn_oid, aggtranstype,
3989 serialfn_oid, deserialfn_oid,
3990 initValue, initValueIsNull,
3992
3993 /*
3994 * Ensure that a combine function to combine INTERNAL states
3995 * is not strict. This should have been checked during CREATE
3996 * AGGREGATE, but the strict property could have been changed
3997 * since then.
3998 */
3999 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4000 ereport(ERROR,
4002 errmsg("combine function with transition type %s must not be declared STRICT",
4003 format_type_be(aggtranstype))));
4004 }
4005 else
4006 {
4007 /* Detect how many arguments to pass to the transfn */
4008 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4009 pertrans->numTransInputs = list_length(aggref->args);
4010 else
4012
4013 build_pertrans_for_aggref(pertrans, aggstate, estate,
4014 aggref, transfn_oid, aggtranstype,
4015 serialfn_oid, deserialfn_oid,
4016 initValue, initValueIsNull,
4019
4020 /*
4021 * If the transfn is strict and the initval is NULL, make sure
4022 * input type and transtype are the same (or at least
4023 * binary-compatible), so that it's OK to use the first
4024 * aggregated input value as the initial transValue. This
4025 * should have been checked at agg definition time, but we
4026 * must check again in case the transfn's strictness property
4027 * has been changed.
4028 */
4029 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4030 {
4033 aggtranstype))
4034 ereport(ERROR,
4036 errmsg("aggregate %u needs to have compatible input type and transition type",
4037 aggref->aggfnoid)));
4038 }
4039 }
4040 }
4041 else
4042 pertrans->aggshared = true;
4044 }
4045
4046 /*
4047 * Last, check whether any more aggregates got added onto the node while
4048 * we processed the expressions for the aggregate arguments (including not
4049 * only the regular arguments and FILTER expressions handled immediately
4050 * above, but any direct arguments we might've handled earlier). If so,
4051 * we have nested aggregate functions, which is semantically nonsensical,
4052 * so complain. (This should have been caught by the parser, so we don't
4053 * need to work hard on a helpful error message; but we defend against it
4054 * here anyway, just to be sure.)
4055 */
4056 if (numaggrefs != list_length(aggstate->aggs))
4057 ereport(ERROR,
4059 errmsg("aggregate function calls cannot be nested")));
4060
4061 /*
4062 * Build expressions doing all the transition work at once. We build a
4063 * different one for each phase, as the number of transition function
4064 * invocation can differ between phases. Note this'll work both for
4065 * transition and combination functions (although there'll only be one
4066 * phase in the latter case).
4067 */
4068 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
4069 {
4070 AggStatePerPhase phase = &aggstate->phases[phaseidx];
4071 bool dohash = false;
4072 bool dosort = false;
4073
4074 /* phase 0 doesn't necessarily exist */
4075 if (!phase->aggnode)
4076 continue;
4077
4078 if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
4079 {
4080 /*
4081 * Phase one, and only phase one, in a mixed agg performs both
4082 * sorting and aggregation.
4083 */
4084 dohash = true;
4085 dosort = true;
4086 }
4087 else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4088 {
4089 /*
4090 * No need to compute a transition function for an AGG_MIXED phase
4091 * 0 - the contents of the hashtables will have been computed
4092 * during phase 1.
4093 */
4094 continue;
4095 }
4096 else if (phase->aggstrategy == AGG_PLAIN ||
4097 phase->aggstrategy == AGG_SORTED)
4098 {
4099 dohash = false;
4100 dosort = true;
4101 }
4102 else if (phase->aggstrategy == AGG_HASHED)
4103 {
4104 dohash = true;
4105 dosort = false;
4106 }
4107 else
4108 Assert(false);
4109
4111 false);
4112
4113 /* cache compiled expression for outer slot without NULL check */
4114 phase->evaltrans_cache[0][0] = phase->evaltrans;
4115 }
4116
4117 return aggstate;
4118}
4119
4120/*
4121 * Build the state needed to calculate a state value for an aggregate.
4122 *
4123 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4124 * to initialize the state for. 'transfn_oid', 'aggtranstype', and the rest
4125 * of the arguments could be calculated from 'aggref', but the caller has
4126 * calculated them already, so might as well pass them.
4127 *
4128 * 'transfn_oid' may be either the Oid of the aggtransfn or the aggcombinefn.
4129 */
4130static void
4132 AggState *aggstate, EState *estate,
4133 Aggref *aggref,
4134 Oid transfn_oid, Oid aggtranstype,
4136 Datum initValue, bool initValueIsNull,
4137 Oid *inputTypes, int numArguments)
4138{
4139 int numGroupingSets = Max(aggstate->maxsets, 1);
4141 int numTransArgs;
4144 ListCell *lc;
4145 int numInputs;
4146 int numDirectArgs;
4147 List *sortlist;
4148 int numSortCols;
4149 int numDistinctCols;
4150 int i;
4151
4152 /* Begin filling in the pertrans data */
4153 pertrans->aggref = aggref;
4154 pertrans->aggshared = false;
4155 pertrans->aggCollation = aggref->inputcollid;
4156 pertrans->transfn_oid = transfn_oid;
4157 pertrans->serialfn_oid = aggserialfn;
4158 pertrans->deserialfn_oid = aggdeserialfn;
4159 pertrans->initValue = initValue;
4160 pertrans->initValueIsNull = initValueIsNull;
4161
4162 /* Count the "direct" arguments, if any */
4164
4165 /* Count the number of aggregated input columns */
4166 pertrans->numInputs = numInputs = list_length(aggref->args);
4167
4168 pertrans->aggtranstype = aggtranstype;
4169
4170 /* account for the current transition state */
4171 numTransArgs = pertrans->numTransInputs + 1;
4172
4173 /*
4174 * Set up infrastructure for calling the transfn. Note that invtransfn is
4175 * not needed here.
4176 */
4178 numArguments,
4180 aggref->aggvariadic,
4181 aggtranstype,
4182 aggref->inputcollid,
4183 transfn_oid,
4184 InvalidOid,
4185 &transfnexpr,
4186 NULL);
4187
4188 fmgr_info(transfn_oid, &pertrans->transfn);
4189 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4190
4191 pertrans->transfn_fcinfo =
4194 &pertrans->transfn,
4196 pertrans->aggCollation,
4197 (Node *) aggstate, NULL);
4198
4199 /* get info about the state value's datatype */
4200 get_typlenbyval(aggtranstype,
4201 &pertrans->transtypeLen,
4202 &pertrans->transtypeByVal);
4203
4205 {
4207 &serialfnexpr);
4208 fmgr_info(aggserialfn, &pertrans->serialfn);
4210
4211 pertrans->serialfn_fcinfo =
4214 &pertrans->serialfn,
4215 1,
4216 InvalidOid,
4217 (Node *) aggstate, NULL);
4218 }
4219
4221 {
4224 fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4226
4227 pertrans->deserialfn_fcinfo =
4230 &pertrans->deserialfn,
4231 2,
4232 InvalidOid,
4233 (Node *) aggstate, NULL);
4234 }
4235
4236 /*
4237 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4238 * have a list of SortGroupClause nodes; fish out the data in them and
4239 * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4240 * however; the agg's transfn and finalfn are responsible for that.
4241 *
4242 * When the planner has set the aggpresorted flag, the input to the
4243 * aggregate is already correctly sorted. For ORDER BY aggregates we can
4244 * simply treat these as normal aggregates. For presorted DISTINCT
4245 * aggregates an extra step must be added to remove duplicate consecutive
4246 * inputs.
4247 *
4248 * Note that by construction, if there is a DISTINCT clause then the ORDER
4249 * BY clause is a prefix of it (see transformDistinctClause).
4250 */
4251 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4252 {
4253 sortlist = NIL;
4254 numSortCols = numDistinctCols = 0;
4255 pertrans->aggsortrequired = false;
4256 }
4257 else if (aggref->aggpresorted && aggref->aggdistinct == NIL)
4258 {
4259 sortlist = NIL;
4260 numSortCols = numDistinctCols = 0;
4261 pertrans->aggsortrequired = false;
4262 }
4263 else if (aggref->aggdistinct)
4264 {
4265 sortlist = aggref->aggdistinct;
4266 numSortCols = numDistinctCols = list_length(sortlist);
4267 Assert(numSortCols >= list_length(aggref->aggorder));
4268 pertrans->aggsortrequired = !aggref->aggpresorted;
4269 }
4270 else
4271 {
4272 sortlist = aggref->aggorder;
4273 numSortCols = list_length(sortlist);
4274 numDistinctCols = 0;
4275 pertrans->aggsortrequired = (numSortCols > 0);
4276 }
4277
4278 pertrans->numSortCols = numSortCols;
4279 pertrans->numDistinctCols = numDistinctCols;
4280
4281 /*
4282 * If we have either sorting or filtering to do, create a tupledesc and
4283 * slot corresponding to the aggregated inputs (including sort
4284 * expressions) of the agg.
4285 */
4286 if (numSortCols > 0 || aggref->aggfilter)
4287 {
4288 pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4289 pertrans->sortslot =
4290 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4292 }
4293
4294 if (numSortCols > 0)
4295 {
4296 /*
4297 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4298 * (yet)
4299 */
4300 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4301
4302 /* ORDER BY aggregates are not supported with partial aggregation */
4303 Assert(!DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
4304
4305 /* If we have only one input, we need its len/byval info. */
4306 if (numInputs == 1)
4307 {
4309 &pertrans->inputtypeLen,
4310 &pertrans->inputtypeByVal);
4311 }
4312 else if (numDistinctCols > 0)
4313 {
4314 /* we will need an extra slot to store prior values */
4315 pertrans->uniqslot =
4316 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4318 }
4319
4320 /* Extract the sort information for use later */
4321 pertrans->sortColIdx =
4322 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4323 pertrans->sortOperators =
4324 (Oid *) palloc(numSortCols * sizeof(Oid));
4325 pertrans->sortCollations =
4326 (Oid *) palloc(numSortCols * sizeof(Oid));
4327 pertrans->sortNullsFirst =
4328 (bool *) palloc(numSortCols * sizeof(bool));
4329
4330 i = 0;
4331 foreach(lc, sortlist)
4332 {
4335
4336 /* the parser should have made sure of this */
4337 Assert(OidIsValid(sortcl->sortop));
4338
4339 pertrans->sortColIdx[i] = tle->resno;
4340 pertrans->sortOperators[i] = sortcl->sortop;
4341 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4342 pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4343 i++;
4344 }
4345 Assert(i == numSortCols);
4346 }
4347
4348 if (aggref->aggdistinct)
4349 {
4350 Oid *ops;
4351
4352 Assert(numArguments > 0);
4353 Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4354
4355 ops = palloc(numDistinctCols * sizeof(Oid));
4356
4357 i = 0;
4358 foreach(lc, aggref->aggdistinct)
4359 ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4360
4361 /* lookup / build the necessary comparators */
4362 if (numDistinctCols == 1)
4363 fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4364 else
4365 pertrans->equalfnMulti =
4367 numDistinctCols,
4368 pertrans->sortColIdx,
4369 ops,
4370 pertrans->sortCollations,
4371 &aggstate->ss.ps);
4372 pfree(ops);
4373 }
4374
4376}
4377
4378
4379static Datum
4381{
4382 Oid typinput,
4383 typioparam;
4384 char *strInitVal;
4385 Datum initVal;
4386
4387 getTypeInputInfo(transtype, &typinput, &typioparam);
4390 typioparam, -1);
4392 return initVal;
4393}
4394
4395void
4397{
4399 int transno;
4400 int numGroupingSets = Max(node->maxsets, 1);
4401 int setno;
4402
4403 /*
4404 * When ending a parallel worker, copy the statistics gathered by the
4405 * worker back into shared memory so that it can be picked up by the main
4406 * process to report in EXPLAIN ANALYZE.
4407 */
4408 if (node->shared_info && IsParallelWorker())
4409 {
4411
4412 Assert(ParallelWorkerNumber < node->shared_info->num_workers);
4415 si->hash_disk_used = node->hash_disk_used;
4416 si->hash_mem_peak = node->hash_mem_peak;
4417 }
4418
4419 /* Make sure we have closed any open tuplesorts */
4420
4421 if (node->sort_in)
4422 tuplesort_end(node->sort_in);
4423 if (node->sort_out)
4424 tuplesort_end(node->sort_out);
4425
4427
4428 /* Release hash tables too */
4429 if (node->hash_metacxt != NULL)
4430 {
4432 node->hash_metacxt = NULL;
4433 }
4434 if (node->hash_tuplescxt != NULL)
4435 {
4437 node->hash_tuplescxt = NULL;
4438 }
4439
4440 for (transno = 0; transno < node->numtrans; transno++)
4441 {
4442 AggStatePerTrans pertrans = &node->pertrans[transno];
4443
4444 for (setno = 0; setno < numGroupingSets; setno++)
4445 {
4446 if (pertrans->sortstates[setno])
4447 tuplesort_end(pertrans->sortstates[setno]);
4448 }
4449 }
4450
4451 /* And ensure any agg shutdown callbacks have been called */
4452 for (setno = 0; setno < numGroupingSets; setno++)
4453 ReScanExprContext(node->aggcontexts[setno]);
4454 if (node->hashcontext)
4456
4457 outerPlan = outerPlanState(node);
4459}
4460
4461void
4463{
4464 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4466 Agg *aggnode = (Agg *) node->ss.ps.plan;
4467 int transno;
4468 int numGroupingSets = Max(node->maxsets, 1);
4469 int setno;
4470
4471 node->agg_done = false;
4472
4473 if (node->aggstrategy == AGG_HASHED)
4474 {
4475 /*
4476 * In the hashed case, if we haven't yet built the hash table then we
4477 * can just return; nothing done yet, so nothing to undo. If subnode's
4478 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4479 * else no reason to re-scan it at all.
4480 */
4481 if (!node->table_filled)
4482 return;
4483
4484 /*
4485 * If we do have the hash table, and it never spilled, and the subplan
4486 * does not have any parameter changes, and none of our own parameter
4487 * changes affect input expressions of the aggregated functions, then
4488 * we can just rescan the existing hash table; no need to build it
4489 * again.
4490 */
4491 if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4492 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4493 {
4495 &node->perhash[0].hashiter);
4496 select_current_set(node, 0, true);
4497 return;
4498 }
4499 }
4500
4501 /* Make sure we have closed any open tuplesorts */
4502 for (transno = 0; transno < node->numtrans; transno++)
4503 {
4504 for (setno = 0; setno < numGroupingSets; setno++)
4505 {
4506 AggStatePerTrans pertrans = &node->pertrans[transno];
4507
4508 if (pertrans->sortstates[setno])
4509 {
4510 tuplesort_end(pertrans->sortstates[setno]);
4511 pertrans->sortstates[setno] = NULL;
4512 }
4513 }
4514 }
4515
4516 /*
4517 * We don't need to ReScanExprContext the output tuple context here;
4518 * ExecReScan already did it. But we do need to reset our per-grouping-set
4519 * contexts, which may have transvalues stored in them. (We use rescan
4520 * rather than just reset because transfns may have registered callbacks
4521 * that need to be run now.) For the AGG_HASHED case, see below.
4522 */
4523
4524 for (setno = 0; setno < numGroupingSets; setno++)
4525 {
4526 ReScanExprContext(node->aggcontexts[setno]);
4527 }
4528
4529 /* Release first tuple of group, if we have made a copy */
4530 if (node->grp_firstTuple != NULL)
4531 {
4533 node->grp_firstTuple = NULL;
4534 }
4536
4537 /* Forget current agg values */
4538 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4539 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4540
4541 /*
4542 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4543 * the hashcontext. This used to be an issue, but now, resetting a context
4544 * automatically deletes sub-contexts too.
4545 */
4546 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4547 {
4549
4550 node->hash_ever_spilled = false;
4551 node->hash_spill_mode = false;
4552 node->hash_ngroups_current = 0;
4553
4555 /* Rebuild empty hash table(s) */
4556 build_hash_tables(node);
4557 node->table_filled = false;
4558 /* iterator will be reset when the table is filled */
4559
4560 hashagg_recompile_expressions(node, false, false);
4561 }
4562
4563 if (node->aggstrategy != AGG_HASHED)
4564 {
4565 /*
4566 * Reset the per-group state (in particular, mark transvalues null)
4567 */
4568 for (setno = 0; setno < numGroupingSets; setno++)
4569 {
4570 MemSet(node->pergroups[setno], 0,
4571 sizeof(AggStatePerGroupData) * node->numaggs);
4572 }
4573
4574 /* reset to phase 1 */
4575 initialize_phase(node, 1);
4576
4577 node->input_done = false;
4578 node->projected_set = -1;
4579 }
4580
4581 if (outerPlan->chgParam == NULL)
4583}
4584
4585
4586/***********************************************************************
4587 * API exposed to aggregate functions
4588 ***********************************************************************/
4589
4590
4591/*
4592 * AggCheckCallContext - test if a SQL function is being called as an aggregate
4593 *
4594 * The transition and/or final functions of an aggregate may want to verify
4595 * that they are being called as aggregates, rather than as plain SQL
4596 * functions. They should use this function to do so. The return value
4597 * is nonzero if being called as an aggregate, or zero if not. (Specific
4598 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4599 * values could conceivably appear in future.)
4600 *
4601 * If aggcontext isn't NULL, the function also stores at *aggcontext the
4602 * identity of the memory context that aggregate transition values are being
4603 * stored in. Note that the same aggregate call site (flinfo) may be called
4604 * interleaved on different transition values in different contexts, so it's
4605 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4606 * cache it in the transvalue itself (for internal-type transvalues).
4607 */
4608int
4610{
4611 if (fcinfo->context && IsA(fcinfo->context, AggState))
4612 {
4613 if (aggcontext)
4614 {
4615 AggState *aggstate = ((AggState *) fcinfo->context);
4616 ExprContext *cxt = aggstate->curaggcontext;
4617
4618 *aggcontext = cxt->ecxt_per_tuple_memory;
4619 }
4620 return AGG_CONTEXT_AGGREGATE;
4621 }
4622 if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4623 {
4624 if (aggcontext)
4625 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4626 return AGG_CONTEXT_WINDOW;
4627 }
4628
4629 /* this is just to prevent "uninitialized variable" warnings */
4630 if (aggcontext)
4631 *aggcontext = NULL;
4632 return 0;
4633}
4634
4635/*
4636 * AggGetAggref - allow an aggregate support function to get its Aggref
4637 *
4638 * If the function is being called as an aggregate support function,
4639 * return the Aggref node for the aggregate call. Otherwise, return NULL.
4640 *
4641 * Aggregates sharing the same inputs and transition functions can get
4642 * merged into a single transition calculation. If the transition function
4643 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4644 * executing. It must therefore not pay attention to the Aggref fields that
4645 * relate to the final function, as those are indeterminate. But if a final
4646 * function calls AggGetAggref, it will get a precise result.
4647 *
4648 * Note that if an aggregate is being used as a window function, this will
4649 * return NULL. We could provide a similar function to return the relevant
4650 * WindowFunc node in such cases, but it's not needed yet.
4651 */
4652Aggref *
4654{
4655 if (fcinfo->context && IsA(fcinfo->context, AggState))
4656 {
4657 AggState *aggstate = (AggState *) fcinfo->context;
4658 AggStatePerAgg curperagg;
4659 AggStatePerTrans curpertrans;
4660
4661 /* check curperagg (valid when in a final function) */
4662 curperagg = aggstate->curperagg;
4663
4664 if (curperagg)
4665 return curperagg->aggref;
4666
4667 /* check curpertrans (valid when in a transition function) */
4668 curpertrans = aggstate->curpertrans;
4669
4670 if (curpertrans)
4671 return curpertrans->aggref;
4672 }
4673 return NULL;
4674}
4675
4676/*
4677 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4678 *
4679 * This is useful in agg final functions; the context returned is one that
4680 * the final function can safely reset as desired. This isn't useful for
4681 * transition functions, since the context returned MAY (we don't promise)
4682 * be the same as the context those are called in.
4683 *
4684 * As above, this is currently not useful for aggs called as window functions.
4685 */
4688{
4689 if (fcinfo->context && IsA(fcinfo->context, AggState))
4690 {
4691 AggState *aggstate = (AggState *) fcinfo->context;
4692
4693 return aggstate->tmpcontext->ecxt_per_tuple_memory;
4694 }
4695 return NULL;
4696}
4697
4698/*
4699 * AggStateIsShared - find out whether transition state is shared
4700 *
4701 * If the function is being called as an aggregate support function,
4702 * return true if the aggregate's transition state is shared across
4703 * multiple aggregates, false if it is not.
4704 *
4705 * Returns true if not called as an aggregate support function.
4706 * This is intended as a conservative answer, ie "no you'd better not
4707 * scribble on your input". In particular, will return true if the
4708 * aggregate is being used as a window function, which is a scenario
4709 * in which changing the transition state is a bad idea. We might
4710 * want to refine the behavior for the window case in future.
4711 */
4712bool
4714{
4715 if (fcinfo->context && IsA(fcinfo->context, AggState))
4716 {
4717 AggState *aggstate = (AggState *) fcinfo->context;
4718 AggStatePerAgg curperagg;
4719 AggStatePerTrans curpertrans;
4720
4721 /* check curperagg (valid when in a final function) */
4722 curperagg = aggstate->curperagg;
4723
4724 if (curperagg)
4725 return aggstate->pertrans[curperagg->transno].aggshared;
4726
4727 /* check curpertrans (valid when in a transition function) */
4728 curpertrans = aggstate->curpertrans;
4729
4730 if (curpertrans)
4731 return curpertrans->aggshared;
4732 }
4733 return true;
4734}
4735
4736/*
4737 * AggRegisterCallback - register a cleanup callback for an aggregate
4738 *
4739 * This is useful for aggs to register shutdown callbacks, which will ensure
4740 * that non-memory resources are freed. The callback will occur just before
4741 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4742 * either between groups or as a result of rescanning the query. The callback
4743 * will NOT be called on error paths. The typical use-case is for freeing of
4744 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4745 * created by the agg functions. (The callback will not be called until after
4746 * the result of the finalfn is no longer needed, so it's safe for the finalfn
4747 * to return data that will be freed by the callback.)
4748 *
4749 * As above, this is currently not useful for aggs called as window functions.
4750 */
4751void
4754 Datum arg)
4755{
4756 if (fcinfo->context && IsA(fcinfo->context, AggState))
4757 {
4758 AggState *aggstate = (AggState *) fcinfo->context;
4759 ExprContext *cxt = aggstate->curaggcontext;
4760
4761 RegisterExprContextCallback(cxt, func, arg);
4762
4763 return;
4764 }
4765 elog(ERROR, "aggregate function cannot register a callback in this context");
4766}
4767
4768
4769/* ----------------------------------------------------------------
4770 * Parallel Query Support
4771 * ----------------------------------------------------------------
4772 */
4773
4774 /* ----------------------------------------------------------------
4775 * ExecAggEstimate
4776 *
4777 * Estimate space required to propagate aggregate statistics.
4778 * ----------------------------------------------------------------
4779 */
4780void
4782{
4783 Size size;
4784
4785 /* don't need this if not instrumenting or no workers */
4786 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4787 return;
4788
4789 size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4790 size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4791 shm_toc_estimate_chunk(&pcxt->estimator, size);
4793}
4794
4795/* ----------------------------------------------------------------
4796 * ExecAggInitializeDSM
4797 *
4798 * Initialize DSM space for aggregate statistics.
4799 * ----------------------------------------------------------------
4800 */
4801void
4803{
4804 Size size;
4805
4806 /* don't need this if not instrumenting or no workers */
4807 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4808 return;
4809
4810 size = offsetof(SharedAggInfo, sinstrument)
4811 + pcxt->nworkers * sizeof(AggregateInstrumentation);
4812 node->shared_info = shm_toc_allocate(pcxt->toc, size);
4813 /* ensure any unfilled slots will contain zeroes */
4814 memset(node->shared_info, 0, size);
4815 node->shared_info->num_workers = pcxt->nworkers;
4816 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4817 node->shared_info);
4818}
4819
4820/* ----------------------------------------------------------------
4821 * ExecAggInitializeWorker
4822 *
4823 * Attach worker to DSM space for aggregate statistics.
4824 * ----------------------------------------------------------------
4825 */
4826void
4832
4833/* ----------------------------------------------------------------
4834 * ExecAggRetrieveInstrumentation
4835 *
4836 * Transfer aggregate statistics from DSM to private memory.
4837 * ----------------------------------------------------------------
4838 */
4839void
4841{
4842 Size size;
4844
4845 if (node->shared_info == NULL)
4846 return;
4847
4848 size = offsetof(SharedAggInfo, sinstrument)
4850 si = palloc(size);
4851 memcpy(si, node->shared_info, size);
4852 node->shared_info = si;
4853}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3879
int16 AttrNumber
Definition attnum.h:21
int ParallelWorkerNumber
Definition parallel.c:117
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
Bitmapset * bms_del_member(Bitmapset *a, int x)
Definition bitmapset.c:852
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
int bms_num_members(const Bitmapset *a)
Definition bitmapset.c:744
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
Bitmapset * bms_add_members(Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:901
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:251
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:575
Bitmapset * bms_copy(const Bitmapset *a)
Definition bitmapset.c:122
#define TextDatumGetCString(d)
Definition builtins.h:99
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition bump.c:133
#define Min(x, y)
Definition c.h:1093
#define MAXALIGN(LEN)
Definition c.h:898
#define Max(x, y)
Definition c.h:1087
#define Assert(condition)
Definition c.h:945
int64_t int64
Definition c.h:615
uint64_t uint64
Definition c.h:619
uint32_t uint32
Definition c.h:618
#define pg_fallthrough
Definition c.h:152
#define MemSet(start, val, len)
Definition c.h:1109
#define OidIsValid(objectId)
Definition c.h:860
size_t Size
Definition c.h:691
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition datum.c:132
Datum arg
Definition elog.c:1322
int errcode_for_file_access(void)
Definition elog.c:897
int errcode(int sqlerrcode)
Definition elog.c:874
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
void ExecReScan(PlanState *node)
Definition execAmi.c:78
Datum ExecAggCopyTransValue(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition execExpr.c:250
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition execExpr.c:356
ExprState * ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, bool doSort, bool doHash, bool nullcheck)
Definition execExpr.c:3704
ExprState * execTuplesMatchPrepare(TupleDesc desc, int numCols, const AttrNumber *keyColIdx, const Oid *eqOperators, const Oid *collations, PlanState *parent)
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
TupleHashTable BuildTupleHashTable(PlanState *parent, TupleDesc inputDesc, const TupleTableSlotOps *inputOps, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, double nelements, Size additionalsize, MemoryContext metacxt, MemoryContext tuplescxt, MemoryContext tempcxt, bool use_variable_hash_iv)
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 *hash)
void ResetTupleHashTable(TupleHashTable hashtable)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops, uint16 flags)
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
TupleDesc ExecTypeFromTL(List *targetList)
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
TupleDesc ExecGetResultType(PlanState *planstate)
Definition execUtils.c:500
void ReScanExprContext(ExprContext *econtext)
Definition execUtils.c:448
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
Definition execUtils.c:709
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition execUtils.c:490
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition execUtils.c:588
void RegisterExprContextCallback(ExprContext *econtext, ExprContextCallbackFunction function, Datum arg)
Definition execUtils.c:968
ExprContext * CreateWorkExprContext(EState *estate)
Definition execUtils.c:327
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition execUtils.c:509
#define InstrCountFiltered1(node, delta)
Definition execnodes.h:1281
#define outerPlanState(node)
Definition execnodes.h:1273
#define ScanTupleHashTable(htable, iter)
Definition execnodes.h:912
#define ResetTupleHashIterator(htable, iter)
Definition execnodes.h:910
struct AggStatePerGroupData * AggStatePerGroup
Definition execnodes.h:2398
static MinimalTuple TupleHashEntryGetTuple(TupleHashEntry entry)
Definition executor.h:179
#define EXEC_FLAG_BACKWARD
Definition executor.h:70
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition executor.h:486
static void * TupleHashEntryGetAdditional(TupleHashTable hashtable, TupleHashEntry entry)
Definition executor.h:193
#define ResetExprContext(econtext)
Definition executor.h:654
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition executor.h:522
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition executor.h:549
static size_t TupleHashEntrySize(void)
Definition executor.h:170
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition executor.h:315
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:396
static void ExecEvalExprNoReturnSwitchContext(ExprState *state, ExprContext *econtext)
Definition executor.h:461
#define EXEC_FLAG_EXPLAIN_ONLY
Definition executor.h:67
#define EXEC_FLAG_MARK
Definition executor.h:71
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define palloc0_object(type)
Definition fe_memutils.h:75
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition fmgr.c:1151
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition fmgr.c:129
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1755
void(* ExprContextCallbackFunction)(Datum arg)
Definition fmgr.h:26
#define SizeForFunctionCallInfo(nargs)
Definition fmgr.h:102
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition fmgr.h:150
#define AGG_CONTEXT_WINDOW
Definition fmgr.h:815
#define LOCAL_FCINFO(name, nargs)
Definition fmgr.h:110
#define AGG_CONTEXT_AGGREGATE
Definition fmgr.h:814
struct FunctionCallInfoBaseData * FunctionCallInfo
Definition fmgr.h:38
#define FunctionCallInvoke(fcinfo)
Definition fmgr.h:172
#define fmgr_info_set_expr(expr, finfo)
Definition fmgr.h:135
char * format_type_be(Oid type_oid)
int work_mem
Definition globals.c:131
uint32 hash_bytes_uint32(uint32 k)
Definition hashfn.c:610
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
MinimalTupleData * MinimalTuple
Definition htup.h:27
#define HeapTupleIsValid(tuple)
Definition htup.h:78
#define SizeofMinimalTupleHeader
static void * GETSTRUCT(const HeapTupleData *tuple)
void initHyperLogLog(hyperLogLogState *cState, uint8 bwidth)
Definition hyperloglog.c:66
double estimateHyperLogLog(hyperLogLogState *cState)
void addHyperLogLog(hyperLogLogState *cState, uint32 hash)
void freeHyperLogLog(hyperLogLogState *cState)
#define IsParallelWorker()
Definition parallel.h:62
static int initValue(long lng_val)
Definition informix.c:702
#define INJECTION_POINT(name, arg)
#define IS_INJECTION_POINT_ATTACHED(name)
#define INJECTION_POINT_CACHED(name, arg)
int j
Definition isn.c:78
int i
Definition isn.c:77
List * lappend(List *list, void *datum)
Definition list.c:339
List * lcons_int(int datum, List *list)
Definition list.c:513
List * list_delete_last(List *list)
Definition list.c:957
void list_free(List *list)
Definition list.c:1546
void list_free_deep(List *list)
Definition list.c:1560
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition logtape.c:846
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition logtape.c:667
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition logtape.c:556
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition logtape.c:761
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition logtape.c:680
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition lsyscache.c:2471
RegProcedure get_opcode(Oid opno)
Definition lsyscache.c:1505
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition lsyscache.c:3096
char * get_func_name(Oid funcid)
Definition lsyscache.c:1828
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
Size MemoryContextMemAllocated(MemoryContext context, bool recurse)
Definition mcxt.c:811
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_MAXSIZE
Definition memutils.h:159
#define ALLOCSET_DEFAULT_MINSIZE
Definition memutils.h:157
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define ALLOCSET_DEFAULT_INITSIZE
Definition memutils.h:158
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
Oid GetUserId(void)
Definition miscinit.c:470
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition nodeAgg.c:3167
static void hash_agg_check_limits(AggState *aggstate)
Definition nodeAgg.c:1866
static void initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable, TupleHashEntry entry)
Definition nodeAgg.c:2140
static void find_hash_columns(AggState *aggstate)
Definition nodeAgg.c:1569
static bool agg_refill_hash_table(AggState *aggstate)
Definition nodeAgg.c:2683
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition nodeAgg.c:4781
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition nodeAgg.c:1910
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition nodeAgg.c:4380
static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
Definition nodeAgg.c:1397
void AggRegisterCallback(FunctionCallInfo fcinfo, ExprContextCallbackFunction func, Datum arg)
Definition nodeAgg.c:4752
static void build_hash_table(AggState *aggstate, int setno, double nbuckets)
Definition nodeAgg.c:1508
#define HASHAGG_HLL_BIT_WIDTH
Definition nodeAgg.c:317
static void agg_fill_hash_table(AggState *aggstate)
Definition nodeAgg.c:2629
Aggref * AggGetAggref(FunctionCallInfo fcinfo)
Definition nodeAgg.c:4653
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition nodeAgg.c:581
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition nodeAgg.c:550
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition nodeAgg.c:3201
static bool find_cols_walker(Node *node, FindColsContext *context)
Definition nodeAgg.c:1420
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
Definition nodeAgg.c:4827
void ExecAggRetrieveInstrumentation(AggState *node)
Definition nodeAgg.c:4840
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition nodeAgg.c:1371
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
Definition nodeAgg.c:3118
static double hash_choose_num_buckets(double hashentrysize, double ngroups, Size memory)
Definition nodeAgg.c:2057
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition nodeAgg.c:949
void ExecReScanAgg(AggState *node)
Definition nodeAgg.c:4462
int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
Definition nodeAgg.c:4609
static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition nodeAgg.c:709
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition nodeAgg.c:1946
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peraggs, AggStatePerGroup pergroup)
Definition nodeAgg.c:1294
static void initialize_phase(AggState *aggstate, int newphase)
Definition nodeAgg.c:480
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
Definition nodeAgg.c:1700
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition nodeAgg.c:668
static TupleTableSlot * agg_retrieve_hash_table_in_memory(AggState *aggstate)
Definition nodeAgg.c:2862
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition nodeAgg.c:4802
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition nodeAgg.c:1046
#define HASHAGG_MAX_PARTITIONS
Definition nodeAgg.c:300
static void lookup_hash_entries(AggState *aggstate)
Definition nodeAgg.c:2184
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition nodeAgg.c:2283
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
Definition nodeAgg.c:1751
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition nodeAgg.c:1249
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition nodeAgg.c:4713
static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, Aggref *aggref, Oid transfn_oid, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments)
Definition nodeAgg.c:4131
#define CHUNKHDRSZ
Definition nodeAgg.c:322
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition nodeAgg.c:2837
static void process_ordered_aggregate_single(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition nodeAgg.c:848
static void advance_aggregates(AggState *aggstate)
Definition nodeAgg.c:819
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition nodeAgg.c:2247
static void prepare_hash_slot(AggStatePerHash perhash, TupleTableSlot *inputslot, TupleTableSlot *hashslot)
Definition nodeAgg.c:1204
static void build_hash_tables(AggState *aggstate)
Definition nodeAgg.c:1467
void ExecEndAgg(AggState *node)
Definition nodeAgg.c:4396
#define HASHAGG_READ_BUFFER_SIZE
Definition nodeAgg.c:308
static void hashagg_reset_spill_state(AggState *aggstate)
Definition nodeAgg.c:3241
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *inputslot, uint32 hash)
Definition nodeAgg.c:3029
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition nodeAgg.c:458
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition nodeAgg.c:1146
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition nodeAgg.c:3281
static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, double input_groups, double hashentrysize)
Definition nodeAgg.c:2986
#define HASHAGG_MIN_PARTITIONS
Definition nodeAgg.c:299
void hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions)
Definition nodeAgg.c:1808
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition nodeAgg.c:4687
#define HASHAGG_PARTITION_FACTOR
Definition nodeAgg.c:298
static HashAggBatch * hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits)
Definition nodeAgg.c:3099
#define HASHAGG_WRITE_BUFFER_SIZE
Definition nodeAgg.c:309
static void hash_create_memory(AggState *aggstate)
Definition nodeAgg.c:1999
static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartitions)
Definition nodeAgg.c:2086
Oid exprCollation(const Node *expr)
Definition nodeFuncs.c:826
#define expression_tree_walker(n, w, c)
Definition nodeFuncs.h:153
size_t get_hash_memory_limit(void)
Definition nodeHash.c:3680
#define DO_AGGSPLIT_SKIPFINAL(as)
Definition nodes.h:396
#define IsA(nodeptr, _type_)
Definition nodes.h:164
#define DO_AGGSPLIT_DESERIALIZE(as)
Definition nodes.h:398
#define DO_AGGSPLIT_COMBINE(as)
Definition nodes.h:395
@ AGG_SORTED
Definition nodes.h:365
@ AGG_HASHED
Definition nodes.h:366
@ AGG_MIXED
Definition nodes.h:367
@ AGG_PLAIN
Definition nodes.h:364
#define DO_AGGSPLIT_SERIALIZE(as)
Definition nodes.h:397
#define makeNode(_type_)
Definition nodes.h:161
#define castNode(_type_, nodeptr)
Definition nodes.h:182
static char * errmsg
#define InvokeFunctionExecuteHook(objectId)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition parse_agg.c:2349
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition parse_agg.c:2325
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition parse_agg.c:2241
int get_aggregate_argtypes(Aggref *aggref, Oid *inputTypes)
Definition parse_agg.c:2112
void build_aggregate_serialfn_expr(Oid serialfn_oid, Expr **serialfnexpr)
Definition parse_agg.c:2302
bool IsBinaryCoercible(Oid srctype, Oid targettype)
@ OBJECT_AGGREGATE
@ OBJECT_FUNCTION
#define ACL_EXECUTE
Definition parsenodes.h:83
END_CATALOG_STRUCT typedef FormData_pg_aggregate * Form_pg_aggregate
int16 attnum
FormData_pg_attribute * Form_pg_attribute
#define pg_nextpower2_size_t
static uint32 pg_ceil_log2_32(uint32 num)
#define pg_prevpower2_size_t
#define FUNC_MAX_ARGS
#define lfirst(lc)
Definition pg_list.h:172
#define llast(l)
Definition pg_list.h:198
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define lfirst_int(lc)
Definition pg_list.h:173
#define linitial_int(l)
Definition pg_list.h:179
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define list_nth_node(type, list, n)
Definition pg_list.h:327
END_CATALOG_STRUCT typedef FormData_pg_proc * Form_pg_proc
Definition pg_proc.h:140
#define outerPlan(node)
Definition plannodes.h:265
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define InvalidOid
unsigned int Oid
static int fb(int x)
#define OUTER_VAR
Definition primnodes.h:244
static unsigned hash(unsigned *uv, int n)
Definition rege_dfa.c:715
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition shm_toc.c:171
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500
FmgrInfo finalfn
Definition nodeAgg.h:207
List * aggdirectargs
Definition nodeAgg.h:218
Aggref * aggref
Definition nodeAgg.h:195
FmgrInfo * hashfunctions
Definition nodeAgg.h:314
TupleHashTable hashtable
Definition nodeAgg.h:311
TupleTableSlot * hashslot
Definition nodeAgg.h:313
TupleHashIterator hashiter
Definition nodeAgg.h:312
AttrNumber * hashGrpColIdxHash
Definition nodeAgg.h:320
AttrNumber * hashGrpColIdxInput
Definition nodeAgg.h:319
ExprState * evaltrans
Definition nodeAgg.h:291
ExprState * evaltrans_cache[2][2]
Definition nodeAgg.h:299
AggStrategy aggstrategy
Definition nodeAgg.h:282
FmgrInfo serialfn
Definition nodeAgg.h:89
FmgrInfo equalfnOne
Definition nodeAgg.h:115
TupleDesc sortdesc
Definition nodeAgg.h:143
TupleTableSlot * sortslot
Definition nodeAgg.h:141
ExprState * equalfnMulti
Definition nodeAgg.h:116
Tuplesortstate ** sortstates
Definition nodeAgg.h:162
TupleTableSlot * uniqslot
Definition nodeAgg.h:142
FmgrInfo deserialfn
Definition nodeAgg.h:92
FunctionCallInfo deserialfn_fcinfo
Definition nodeAgg.h:175
AttrNumber * sortColIdx
Definition nodeAgg.h:105
FunctionCallInfo serialfn_fcinfo
Definition nodeAgg.h:173
FunctionCallInfo transfn_fcinfo
Definition nodeAgg.h:170
MemoryContext hash_metacxt
Definition execnodes.h:2446
ScanState ss
Definition execnodes.h:2404
Tuplesortstate * sort_out
Definition execnodes.h:2437
uint64 hash_disk_used
Definition execnodes.h:2465
AggStatePerPhase phase
Definition execnodes.h:2410
MemoryContext hash_tuplescxt
Definition execnodes.h:2447
HeapTuple grp_firstTuple
Definition execnodes.h:2442
bool table_filled
Definition execnodes.h:2444
AggStatePerTrans pertrans
Definition execnodes.h:2414
AggStrategy aggstrategy
Definition execnodes.h:2408
int numtrans
Definition execnodes.h:2407
ExprContext * hashcontext
Definition execnodes.h:2415
int projected_set
Definition execnodes.h:2425
SharedAggInfo * shared_info
Definition execnodes.h:2476
bool input_done
Definition execnodes.h:2423
bool hash_spill_mode
Definition execnodes.h:2455
AggStatePerGroup * pergroups
Definition execnodes.h:2440
AggStatePerHash perhash
Definition execnodes.h:2468
Size hash_mem_peak
Definition execnodes.h:2462
uint64 hash_ngroups_current
Definition execnodes.h:2463
int hash_batches_used
Definition execnodes.h:2466
Tuplesortstate * sort_in
Definition execnodes.h:2436
bool hash_ever_spilled
Definition execnodes.h:2454
int numaggs
Definition execnodes.h:2406
int maxsets
Definition execnodes.h:2434
ExprContext ** aggcontexts
Definition execnodes.h:2416
bool agg_done
Definition execnodes.h:2424
AggSplit aggsplit
Definition plannodes.h:1213
List * chain
Definition plannodes.h:1240
List * groupingSets
Definition plannodes.h:1237
Bitmapset * aggParams
Definition plannodes.h:1232
Cardinality numGroups
Definition plannodes.h:1226
Plan plan
Definition plannodes.h:1207
int numCols
Definition plannodes.h:1216
uint64 transitionSpace
Definition plannodes.h:1229
AggStrategy aggstrategy
Definition plannodes.h:1210
Oid aggfnoid
Definition primnodes.h:464
List * aggdistinct
Definition primnodes.h:494
List * aggdirectargs
Definition primnodes.h:485
List * args
Definition primnodes.h:488
Expr * aggfilter
Definition primnodes.h:497
List * aggorder
Definition primnodes.h:491
List * es_tupleTable
Definition execnodes.h:724
MemoryContext ecxt_per_tuple_memory
Definition execnodes.h:292
TupleTableSlot * ecxt_innertuple
Definition execnodes.h:286
Datum * ecxt_aggvalues
Definition execnodes.h:303
bool * ecxt_aggnulls
Definition execnodes.h:305
TupleTableSlot * ecxt_outertuple
Definition execnodes.h:288
Bitmapset * aggregated
Definition nodeAgg.c:365
Bitmapset * unaggregated
Definition nodeAgg.c:366
bool fn_strict
Definition fmgr.h:61
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition fmgr.h:95
int used_bits
Definition nodeAgg.c:355
int64 input_tuples
Definition nodeAgg.c:357
double input_card
Definition nodeAgg.c:358
LogicalTape * input_tape
Definition nodeAgg.c:356
hyperLogLogState * hll_card
Definition nodeAgg.c:340
int64 * ntuples
Definition nodeAgg.c:337
LogicalTape ** partitions
Definition nodeAgg.c:336
int npartitions
Definition nodeAgg.c:335
uint32 mask
Definition nodeAgg.c:338
Definition pg_list.h:54
Definition nodes.h:135
Datum value
Definition postgres.h:87
shm_toc_estimator estimator
Definition parallel.h:43
shm_toc * toc
Definition parallel.h:46
Instrumentation * instrument
Definition execnodes.h:1187
Plan * plan
Definition execnodes.h:1177
Bitmapset * chgParam
Definition execnodes.h:1209
ExprContext * ps_ExprContext
Definition execnodes.h:1216
List * qual
Definition plannodes.h:235
int plan_node_id
Definition plannodes.h:231
TupleTableSlot * ss_ScanTupleSlot
Definition execnodes.h:1636
PlanState ps
Definition execnodes.h:1633
AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]
int numCols
Definition plannodes.h:1144
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
const TupleTableSlotOps *const tts_ops
Definition tuptable.h:127
bool * tts_isnull
Definition tuptable.h:133
Datum * tts_values
Definition tuptable.h:131
AttrNumber varattno
Definition primnodes.h:275
int varno
Definition primnodes.h:270
Index varlevelsup
Definition primnodes.h:295
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:595
TargetEntry * get_sortgroupclause_tle(SortGroupClause *sgClause, List *targetList)
Definition tlist.c:376
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
void tuplesort_performsort(Tuplesortstate *state)
Definition tuplesort.c:1259
void tuplesort_end(Tuplesortstate *state)
Definition tuplesort.c:847
#define TUPLESORT_NONE
Definition tuplesort.h:67
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, int sortopt)
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, int sortopt)
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy, Datum *val, bool *isNull, Datum *abbrev)
#define TTS_EMPTY(slot)
Definition tuptable.h:92
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition tuptable.h:376
static HeapTuple ExecCopySlotHeapTuple(TupleTableSlot *slot)
Definition tuptable.h:503
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
#define TupIsNull(slot)
Definition tuptable.h:325
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390