PostgreSQL Source Code  git master
nodeAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  * Routines to handle aggregate nodes.
5  *
6  * ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  * transvalue = initcond
9  * foreach input_tuple do
10  * transvalue = transfunc(transvalue, input_value(s))
11  * result = finalfunc(transvalue, direct_argument(s))
12  *
13  * If a finalfunc is not supplied then the result is just the ending
14  * value of transvalue.
15  *
16  * Other behaviors can be selected by the "aggsplit" mode, which exists
17  * to support partial aggregation. It is possible to:
18  * * Skip running the finalfunc, so that the output is always the
19  * final transvalue state.
20  * * Substitute the combinefunc for the transfunc, so that transvalue
21  * states (propagated up from a child partial-aggregation step) are merged
22  * rather than processing raw input rows. (The statements below about
23  * the transfunc apply equally to the combinefunc, when it's selected.)
24  * * Apply the serializefunc to the output values (this only makes sense
25  * when skipping the finalfunc, since the serializefunc works on the
26  * transvalue data type).
27  * * Apply the deserializefunc to the input values (this only makes sense
28  * when using the combinefunc, for similar reasons).
29  * It is the planner's responsibility to connect up Agg nodes using these
30  * alternate behaviors in a way that makes sense, with partial aggregation
31  * results being fed to nodes that expect them.
32  *
33  * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  * input tuples and eliminate duplicates (if required) before performing
35  * the above-depicted process. (However, we don't do that for ordered-set
36  * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  * so far as this module is concerned.) Note that partial aggregation
38  * is not supported in these cases, since we couldn't ensure global
39  * ordering or distinctness of the inputs.
40  *
41  * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  * then the first non-NULL input_value is assigned directly to transvalue,
43  * and transfunc isn't applied until the second non-NULL input_value.
44  * The agg's first input type and transtype must be the same in this case!
45  *
46  * If transfunc is marked "strict" then NULL input_values are skipped,
47  * keeping the previous transvalue. If transfunc is not strict then it
48  * is called for every input tuple and must deal with NULL initcond
49  * or NULL input_values for itself.
50  *
51  * If finalfunc is marked "strict" then it is not called when the
52  * ending transvalue is NULL, instead a NULL result is created
53  * automatically (this is just the usual handling of strict functions,
54  * of course). A non-strict finalfunc can make its own choice of
55  * what to return for a NULL ending transvalue.
56  *
57  * Ordered-set aggregates are treated specially in one other way: we
58  * evaluate any "direct" arguments and pass them to the finalfunc along
59  * with the transition value.
60  *
61  * A finalfunc can have additional arguments beyond the transvalue and
62  * any "direct" arguments, corresponding to the input arguments of the
63  * aggregate. These are always just passed as NULL. Such arguments may be
64  * needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  * We compute aggregate input expressions and run the transition functions
67  * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68  * once per input tuple, so when the transvalue datatype is
69  * pass-by-reference, we have to be careful to copy it into a longer-lived
70  * memory context, and free the prior value to avoid memory leakage. We
71  * store transvalues in another set of econtexts, aggstate->aggcontexts
72  * (one per grouping set, see below), which are also used for the hashtable
73  * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74  * reset, at group boundaries so that aggregate transition functions can
75  * register shutdown callbacks via AggRegisterCallback.
76  *
77  * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  * run finalize functions and compute the output tuple; this context can be
79  * reset once per output tuple.
80  *
81  * The executor's AggState node is passed as the fmgr "context" value in
82  * all transfunc and finalfunc calls. It is not recommended that the
83  * transition functions look at the AggState node directly, but they can
84  * use AggCheckCallContext() to verify that they are being called by
85  * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86  * transition function might want to know this is so that it can avoid
87  * palloc'ing a fixed-size pass-by-ref transition value on every call:
88  * it can instead just scribble on and return its left input. Ordinarily
89  * it is completely forbidden for functions to modify pass-by-ref inputs,
90  * but in the aggregate case we know the left input is either the initial
91  * transition value or a previous function result, and in either case its
92  * value need not be preserved. See int8inc() for an example. Notice that
93  * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  * the previous transition value pointer is returned. It is also possible
95  * to avoid repeated data copying when the transition value is an expanded
96  * object: to do that, the transition function must take care to return
97  * an expanded object that is in a child context of the memory context
98  * returned by AggCheckCallContext(). Also, some transition functions want
99  * to store working state in addition to the nominal transition value; they
100  * can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103  * AggState is available as context in earlier releases (back to 8.1),
104  * but direct examination of the node is needed to use it before 9.0.
105  *
106  * As of 9.4, aggregate transition functions can also use AggGetAggref()
107  * to get hold of the Aggref expression node for their aggregate call.
108  * This is mainly intended for ordered-set aggregates, which are not
109  * supported as window functions. (A regular aggregate function would
110  * need some fallback logic to use this, since there's no Aggref node
111  * for a window function.)
112  *
113  * Grouping sets:
114  *
115  * A list of grouping sets which is structurally equivalent to a ROLLUP
116  * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  * ordered data. We do this by keeping a separate set of transition values
118  * for each grouping set being concurrently processed; for each input tuple
119  * we update them all, and on group boundaries we reset those states
120  * (starting at the front of the list) whose grouping values have changed
121  * (the list of grouping sets is ordered from most specific to least
122  * specific).
123  *
124  * Where more complex grouping sets are used, we break them down into
125  * "phases", where each phase has a different sort order (except phase 0
126  * which is reserved for hashing). During each phase but the last, the
127  * input tuples are additionally stored in a tuplesort which is keyed to the
128  * next phase's sort order; during each phase but the first, the input
129  * tuples are drawn from the previously sorted data. (The sorting of the
130  * data for the first phase is handled by the planner, as it might be
131  * satisfied by underlying nodes.)
132  *
133  * Hashing can be mixed with sorted grouping. To do this, we have an
134  * AGG_MIXED strategy that populates the hashtables during the first sorted
135  * phase, and switches to reading them out after completing all sort phases.
136  * We can also support AGG_HASHED with multiple hash tables and no sorting
137  * at all.
138  *
139  * From the perspective of aggregate transition and final functions, the
140  * only issue regarding grouping sets is this: a single call site (flinfo)
141  * of an aggregate function may be used for updating several different
142  * transition values in turn. So the function must not cache in the flinfo
143  * anything which logically belongs as part of the transition value (most
144  * importantly, the memory context in which the transition value exists).
145  * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  * sensitive to the grouping set for which the aggregate function is
147  * currently being called.
148  *
149  * Plan structure:
150  *
151  * What we get from the planner is actually one "real" Agg node which is
152  * part of the plan tree proper, but which optionally has an additional list
153  * of Agg nodes hung off the side via the "chain" field. This is because an
154  * Agg node happens to be a convenient representation of all the data we
155  * need for grouping sets.
156  *
157  * For many purposes, we treat the "real" node as if it were just the first
158  * node in the chain. The chain must be ordered such that hashed entries
159  * come before sorted/plain entries; the real node is marked AGG_MIXED if
160  * there are both types present (in which case the real node describes one
161  * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163  * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  * chained nodes.
166  *
167  * We collect all hashed nodes into a single "phase", numbered 0, and create
168  * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  * Phase 0 is allocated even if there are no hashes, but remains unused in
170  * that case.
171  *
172  * AGG_HASHED nodes actually refer to only a single grouping set each,
173  * because for each hashed grouping we need a separate grpColIdx and
174  * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175  * grouping sets that share a sort order. Each AGG_SORTED node other than
176  * the first one has an associated Sort node which describes the sort order
177  * to be used; the first sorted node takes its input from the outer subtree,
178  * which the planner has already arranged to provide ordered data.
179  *
180  * Memory and ExprContext usage:
181  *
182  * Because we're accumulating aggregate values across input rows, we need to
183  * use more memory contexts than just simple input/output tuple contexts.
184  * In fact, for a rollup, we need a separate context for each grouping set
185  * so that we can reset the inner (finer-grained) aggregates on their group
186  * boundaries while continuing to accumulate values for outer
187  * (coarser-grained) groupings. On top of this, we might be simultaneously
188  * populating hashtables; however, we only need one context for all the
189  * hashtables.
190  *
191  * So we create an array, aggcontexts, with an ExprContext for each grouping
192  * set in the largest rollup that we're going to process, and use the
193  * per-tuple memory context of those ExprContexts to store the aggregate
194  * transition values. hashcontext is the single context created to support
195  * all hash tables.
196  *
197  * Spilling To Disk
198  *
199  * When performing hash aggregation, if the hash table memory exceeds the
200  * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  * mode, we advance the transition states only for groups already in the
202  * hash table. For tuples that would need to create a new hash table
203  * entries (and initialize new transition states), we instead spill them to
204  * disk to be processed later. The tuples are spilled in a partitioned
205  * manner, so that subsequent batches are smaller and less likely to exceed
206  * work_mem (if a batch does exceed work_mem, it must be spilled
207  * recursively).
208  *
209  * Spilled data is written to logical tapes. These provide better control
210  * over memory usage, disk space, and the number of files than if we were
211  * to use a BufFile for each spill.
212  *
213  * Note that it's possible for transition states to start small but then
214  * grow very large; for instance in the case of ARRAY_AGG. In such cases,
215  * it's still possible to significantly exceed work_mem. We try to avoid
216  * this situation by estimating what will fit in the available memory, and
217  * imposing a limit on the number of groups separately from the amount of
218  * memory consumed.
219  *
220  * Transition / Combine function invocation:
221  *
222  * For performance reasons transition functions, including combine
223  * functions, aren't invoked one-by-one from nodeAgg.c after computing
224  * arguments using the expression evaluation engine. Instead
225  * ExecBuildAggTrans() builds one large expression that does both argument
226  * evaluation and transition function invocation. That avoids performance
227  * issues due to repeated uses of expression evaluation, complications due
228  * to filter expressions having to be evaluated early, and allows to JIT
229  * the entire expression into one native function.
230  *
231  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
232  * Portions Copyright (c) 1994, Regents of the University of California
233  *
234  * IDENTIFICATION
235  * src/backend/executor/nodeAgg.c
236  *
237  *-------------------------------------------------------------------------
238  */
239 
240 #include "postgres.h"
241 
242 #include "access/htup_details.h"
243 #include "access/parallel.h"
244 #include "catalog/objectaccess.h"
245 #include "catalog/pg_aggregate.h"
246 #include "catalog/pg_proc.h"
247 #include "catalog/pg_type.h"
248 #include "executor/execExpr.h"
249 #include "executor/executor.h"
250 #include "executor/nodeAgg.h"
251 #include "miscadmin.h"
252 #include "nodes/makefuncs.h"
253 #include "nodes/nodeFuncs.h"
254 #include "optimizer/optimizer.h"
255 #include "parser/parse_agg.h"
256 #include "parser/parse_coerce.h"
257 #include "utils/acl.h"
258 #include "utils/builtins.h"
259 #include "utils/datum.h"
260 #include "utils/dynahash.h"
261 #include "utils/expandeddatum.h"
262 #include "utils/logtape.h"
263 #include "utils/lsyscache.h"
264 #include "utils/memutils.h"
265 #include "utils/syscache.h"
266 #include "utils/tuplesort.h"
267 
268 /*
269  * Control how many partitions are created when spilling HashAgg to
270  * disk.
271  *
272  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
273  * partitions needed such that each partition will fit in memory. The factor
274  * is set higher than one because there's not a high cost to having a few too
275  * many partitions, and it makes it less likely that a partition will need to
276  * be spilled recursively. Another benefit of having more, smaller partitions
277  * is that small hash tables may perform better than large ones due to memory
278  * caching effects.
279  *
280  * We also specify a min and max number of partitions per spill. Too few might
281  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
282  * many will result in lots of memory wasted buffering the spill files (which
283  * could instead be spent on a larger hash table).
284  */
285 #define HASHAGG_PARTITION_FACTOR 1.50
286 #define HASHAGG_MIN_PARTITIONS 4
287 #define HASHAGG_MAX_PARTITIONS 1024
288 
289 /*
290  * For reading from tapes, the buffer size must be a multiple of
291  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
292  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
293  * tape always uses a buffer of size BLCKSZ.
294  */
295 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
296 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
297 
298 /*
299  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
300  * improved?
301  */
302 #define CHUNKHDRSZ 16
303 
304 /*
305  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
306  * number of tapes needed at the start of the algorithm (because it can
307  * recurse), so one tape set is allocated and extended as needed for new
308  * tapes. When a particular tape is already read, rewind it for write mode and
309  * put it in the free list.
310  *
311  * Tapes' buffers can take up substantial memory when many tapes are open at
312  * once. We only need one tape open at a time in read mode (using a buffer
313  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
314  * requiring a buffer of size BLCKSZ) for each partition.
315  */
316 typedef struct HashTapeInfo
317 {
319  int ntapes;
320  int *freetapes;
323 } HashTapeInfo;
324 
325 /*
326  * Represents partitioned spill data for a single hashtable. Contains the
327  * necessary information to route tuples to the correct partition, and to
328  * transform the spilled data into new batches.
329  *
330  * The high bits are used for partition selection (when recursing, we ignore
331  * the bits that have already been used for partition selection at an earlier
332  * level).
333  */
334 typedef struct HashAggSpill
335 {
336  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
337  int npartitions; /* number of partitions */
338  int *partitions; /* spill partition tape numbers */
339  int64 *ntuples; /* number of tuples in each partition */
340  uint32 mask; /* mask to find partition from hash value */
341  int shift; /* after masking, shift by this amount */
342 } HashAggSpill;
343 
344 /*
345  * Represents work to be done for one pass of hash aggregation (with only one
346  * grouping set).
347  *
348  * Also tracks the bits of the hash already used for partition selection by
349  * earlier iterations, so that this batch can use new bits. If all bits have
350  * already been used, no partitioning will be done (any spilled data will go
351  * to a single output tape).
352  */
353 typedef struct HashAggBatch
354 {
355  int setno; /* grouping set */
356  int used_bits; /* number of bits of hash already used */
357  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
358  int input_tapenum; /* input partition tape */
359  int64 input_tuples; /* number of tuples in this batch */
360 } HashAggBatch;
361 
362 /* used to find referenced colnos */
363 typedef struct FindColsContext
364 {
365  bool is_aggref; /* is under an aggref */
366  Bitmapset *aggregated; /* column references under an aggref */
367  Bitmapset *unaggregated; /* other column references */
369 
370 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
371 static void initialize_phase(AggState *aggstate, int newphase);
372 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
373 static void initialize_aggregates(AggState *aggstate,
374  AggStatePerGroup *pergroups,
375  int numReset);
376 static void advance_transition_function(AggState *aggstate,
377  AggStatePerTrans pertrans,
378  AggStatePerGroup pergroupstate);
379 static void advance_aggregates(AggState *aggstate);
380 static void process_ordered_aggregate_single(AggState *aggstate,
381  AggStatePerTrans pertrans,
382  AggStatePerGroup pergroupstate);
383 static void process_ordered_aggregate_multi(AggState *aggstate,
384  AggStatePerTrans pertrans,
385  AggStatePerGroup pergroupstate);
386 static void finalize_aggregate(AggState *aggstate,
387  AggStatePerAgg peragg,
388  AggStatePerGroup pergroupstate,
389  Datum *resultVal, bool *resultIsNull);
390 static void finalize_partialaggregate(AggState *aggstate,
391  AggStatePerAgg peragg,
392  AggStatePerGroup pergroupstate,
393  Datum *resultVal, bool *resultIsNull);
394 static void prepare_hash_slot(AggState *aggstate);
395 static void prepare_projection_slot(AggState *aggstate,
396  TupleTableSlot *slot,
397  int currentSet);
398 static void finalize_aggregates(AggState *aggstate,
399  AggStatePerAgg peragg,
400  AggStatePerGroup pergroup);
401 static TupleTableSlot *project_aggregates(AggState *aggstate);
402 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
403  Bitmapset **unaggregated);
404 static bool find_cols_walker(Node *node, FindColsContext *context);
405 static void build_hash_tables(AggState *aggstate);
406 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
407 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
408  bool nullcheck);
409 static long hash_choose_num_buckets(double hashentrysize,
410  long estimated_nbuckets,
411  Size memory);
412 static int hash_choose_num_partitions(uint64 input_groups,
413  double hashentrysize,
414  int used_bits,
415  int *log2_npartittions);
417  bool *in_hash_table);
418 static void lookup_hash_entries(AggState *aggstate);
419 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
420 static void agg_fill_hash_table(AggState *aggstate);
421 static bool agg_refill_hash_table(AggState *aggstate);
424 static void hash_agg_check_limits(AggState *aggstate);
425 static void hash_agg_enter_spill_mode(AggState *aggstate);
426 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
427  int npartitions);
428 static void hashagg_finish_initial_spills(AggState *aggstate);
429 static void hashagg_reset_spill_state(AggState *aggstate);
431  int input_tapenum, int setno,
432  int64 input_tuples, int used_bits);
433 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
434 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
435  int used_bits, uint64 input_tuples,
436  double hashentrysize);
437 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
438  TupleTableSlot *slot, uint32 hash);
439 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
440  int setno);
441 static void hashagg_tapeinfo_init(AggState *aggstate);
442 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
443  int ndest);
444 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
445 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
446 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
447  AggState *aggstate, EState *estate,
448  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
449  Oid aggserialfn, Oid aggdeserialfn,
450  Datum initValue, bool initValueIsNull,
451  Oid *inputTypes, int numArguments);
452 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
453  int lastaggno, List **same_input_transnos);
454 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
455  bool shareable,
456  Oid aggtransfn, Oid aggtranstype,
457  Oid aggserialfn, Oid aggdeserialfn,
458  Datum initValue, bool initValueIsNull,
459  List *transnos);
460 
461 
462 /*
463  * Select the current grouping set; affects current_set and
464  * curaggcontext.
465  */
466 static void
467 select_current_set(AggState *aggstate, int setno, bool is_hash)
468 {
469  /*
470  * When changing this, also adapt ExecAggPlainTransByVal() and
471  * ExecAggPlainTransByRef().
472  */
473  if (is_hash)
474  aggstate->curaggcontext = aggstate->hashcontext;
475  else
476  aggstate->curaggcontext = aggstate->aggcontexts[setno];
477 
478  aggstate->current_set = setno;
479 }
480 
481 /*
482  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
483  * current_phase + 1. Juggle the tuplesorts accordingly.
484  *
485  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
486  * case, so when entering phase 0, all we need to do is drop open sorts.
487  */
488 static void
489 initialize_phase(AggState *aggstate, int newphase)
490 {
491  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
492 
493  /*
494  * Whatever the previous state, we're now done with whatever input
495  * tuplesort was in use.
496  */
497  if (aggstate->sort_in)
498  {
499  tuplesort_end(aggstate->sort_in);
500  aggstate->sort_in = NULL;
501  }
502 
503  if (newphase <= 1)
504  {
505  /*
506  * Discard any existing output tuplesort.
507  */
508  if (aggstate->sort_out)
509  {
510  tuplesort_end(aggstate->sort_out);
511  aggstate->sort_out = NULL;
512  }
513  }
514  else
515  {
516  /*
517  * The old output tuplesort becomes the new input one, and this is the
518  * right time to actually sort it.
519  */
520  aggstate->sort_in = aggstate->sort_out;
521  aggstate->sort_out = NULL;
522  Assert(aggstate->sort_in);
523  tuplesort_performsort(aggstate->sort_in);
524  }
525 
526  /*
527  * If this isn't the last phase, we need to sort appropriately for the
528  * next phase in sequence.
529  */
530  if (newphase > 0 && newphase < aggstate->numphases - 1)
531  {
532  Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
533  PlanState *outerNode = outerPlanState(aggstate);
534  TupleDesc tupDesc = ExecGetResultType(outerNode);
535 
536  aggstate->sort_out = tuplesort_begin_heap(tupDesc,
537  sortnode->numCols,
538  sortnode->sortColIdx,
539  sortnode->sortOperators,
540  sortnode->collations,
541  sortnode->nullsFirst,
542  work_mem,
543  NULL, false);
544  }
545 
546  aggstate->current_phase = newphase;
547  aggstate->phase = &aggstate->phases[newphase];
548 }
549 
550 /*
551  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
552  * populated by the previous phase. Copy it to the sorter for the next phase
553  * if any.
554  *
555  * Callers cannot rely on memory for tuple in returned slot remaining valid
556  * past any subsequently fetched tuple.
557  */
558 static TupleTableSlot *
560 {
561  TupleTableSlot *slot;
562 
563  if (aggstate->sort_in)
564  {
565  /* make sure we check for interrupts in either path through here */
567  if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
568  aggstate->sort_slot, NULL))
569  return NULL;
570  slot = aggstate->sort_slot;
571  }
572  else
573  slot = ExecProcNode(outerPlanState(aggstate));
574 
575  if (!TupIsNull(slot) && aggstate->sort_out)
576  tuplesort_puttupleslot(aggstate->sort_out, slot);
577 
578  return slot;
579 }
580 
581 /*
582  * (Re)Initialize an individual aggregate.
583  *
584  * This function handles only one grouping set, already set in
585  * aggstate->current_set.
586  *
587  * When called, CurrentMemoryContext should be the per-query context.
588  */
589 static void
591  AggStatePerGroup pergroupstate)
592 {
593  /*
594  * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
595  */
596  if (pertrans->numSortCols > 0)
597  {
598  /*
599  * In case of rescan, maybe there could be an uncompleted sort
600  * operation? Clean it up if so.
601  */
602  if (pertrans->sortstates[aggstate->current_set])
603  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
604 
605 
606  /*
607  * We use a plain Datum sorter when there's a single input column;
608  * otherwise sort the full tuple. (See comments for
609  * process_ordered_aggregate_single.)
610  */
611  if (pertrans->numInputs == 1)
612  {
613  Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
614 
615  pertrans->sortstates[aggstate->current_set] =
616  tuplesort_begin_datum(attr->atttypid,
617  pertrans->sortOperators[0],
618  pertrans->sortCollations[0],
619  pertrans->sortNullsFirst[0],
620  work_mem, NULL, false);
621  }
622  else
623  pertrans->sortstates[aggstate->current_set] =
624  tuplesort_begin_heap(pertrans->sortdesc,
625  pertrans->numSortCols,
626  pertrans->sortColIdx,
627  pertrans->sortOperators,
628  pertrans->sortCollations,
629  pertrans->sortNullsFirst,
630  work_mem, NULL, false);
631  }
632 
633  /*
634  * (Re)set transValue to the initial value.
635  *
636  * Note that when the initial value is pass-by-ref, we must copy it (into
637  * the aggcontext) since we will pfree the transValue later.
638  */
639  if (pertrans->initValueIsNull)
640  pergroupstate->transValue = pertrans->initValue;
641  else
642  {
643  MemoryContext oldContext;
644 
646  pergroupstate->transValue = datumCopy(pertrans->initValue,
647  pertrans->transtypeByVal,
648  pertrans->transtypeLen);
649  MemoryContextSwitchTo(oldContext);
650  }
651  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
652 
653  /*
654  * If the initial value for the transition state doesn't exist in the
655  * pg_aggregate table then we will let the first non-NULL value returned
656  * from the outer procNode become the initial value. (This is useful for
657  * aggregates like max() and min().) The noTransValue flag signals that we
658  * still need to do this.
659  */
660  pergroupstate->noTransValue = pertrans->initValueIsNull;
661 }
662 
663 /*
664  * Initialize all aggregate transition states for a new group of input values.
665  *
666  * If there are multiple grouping sets, we initialize only the first numReset
667  * of them (the grouping sets are ordered so that the most specific one, which
668  * is reset most often, is first). As a convenience, if numReset is 0, we
669  * reinitialize all sets.
670  *
671  * NB: This cannot be used for hash aggregates, as for those the grouping set
672  * number has to be specified from further up.
673  *
674  * When called, CurrentMemoryContext should be the per-query context.
675  */
676 static void
678  AggStatePerGroup *pergroups,
679  int numReset)
680 {
681  int transno;
682  int numGroupingSets = Max(aggstate->phase->numsets, 1);
683  int setno = 0;
684  int numTrans = aggstate->numtrans;
685  AggStatePerTrans transstates = aggstate->pertrans;
686 
687  if (numReset == 0)
688  numReset = numGroupingSets;
689 
690  for (setno = 0; setno < numReset; setno++)
691  {
692  AggStatePerGroup pergroup = pergroups[setno];
693 
694  select_current_set(aggstate, setno, false);
695 
696  for (transno = 0; transno < numTrans; transno++)
697  {
698  AggStatePerTrans pertrans = &transstates[transno];
699  AggStatePerGroup pergroupstate = &pergroup[transno];
700 
701  initialize_aggregate(aggstate, pertrans, pergroupstate);
702  }
703  }
704 }
705 
706 /*
707  * Given new input value(s), advance the transition function of one aggregate
708  * state within one grouping set only (already set in aggstate->current_set)
709  *
710  * The new values (and null flags) have been preloaded into argument positions
711  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
712  * pass to the transition function. We also expect that the static fields of
713  * the fcinfo are already initialized; that was done by ExecInitAgg().
714  *
715  * It doesn't matter which memory context this is called in.
716  */
717 static void
719  AggStatePerTrans pertrans,
720  AggStatePerGroup pergroupstate)
721 {
722  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
723  MemoryContext oldContext;
724  Datum newVal;
725 
726  if (pertrans->transfn.fn_strict)
727  {
728  /*
729  * For a strict transfn, nothing happens when there's a NULL input; we
730  * just keep the prior transValue.
731  */
732  int numTransInputs = pertrans->numTransInputs;
733  int i;
734 
735  for (i = 1; i <= numTransInputs; i++)
736  {
737  if (fcinfo->args[i].isnull)
738  return;
739  }
740  if (pergroupstate->noTransValue)
741  {
742  /*
743  * transValue has not been initialized. This is the first non-NULL
744  * input value. We use it as the initial value for transValue. (We
745  * already checked that the agg's input type is binary-compatible
746  * with its transtype, so straight copy here is OK.)
747  *
748  * We must copy the datum into aggcontext if it is pass-by-ref. We
749  * do not need to pfree the old transValue, since it's NULL.
750  */
752  pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
753  pertrans->transtypeByVal,
754  pertrans->transtypeLen);
755  pergroupstate->transValueIsNull = false;
756  pergroupstate->noTransValue = false;
757  MemoryContextSwitchTo(oldContext);
758  return;
759  }
760  if (pergroupstate->transValueIsNull)
761  {
762  /*
763  * Don't call a strict function with NULL inputs. Note it is
764  * possible to get here despite the above tests, if the transfn is
765  * strict *and* returned a NULL on a prior cycle. If that happens
766  * we will propagate the NULL all the way to the end.
767  */
768  return;
769  }
770  }
771 
772  /* We run the transition functions in per-input-tuple memory context */
773  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
774 
775  /* set up aggstate->curpertrans for AggGetAggref() */
776  aggstate->curpertrans = pertrans;
777 
778  /*
779  * OK to call the transition function
780  */
781  fcinfo->args[0].value = pergroupstate->transValue;
782  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
783  fcinfo->isnull = false; /* just in case transfn doesn't set it */
784 
785  newVal = FunctionCallInvoke(fcinfo);
786 
787  aggstate->curpertrans = NULL;
788 
789  /*
790  * If pass-by-ref datatype, must copy the new value into aggcontext and
791  * free the prior transValue. But if transfn returned a pointer to its
792  * first input, we don't need to do anything. Also, if transfn returned a
793  * pointer to a R/W expanded object that is already a child of the
794  * aggcontext, assume we can adopt that value without copying it.
795  *
796  * It's safe to compare newVal with pergroup->transValue without regard
797  * for either being NULL, because ExecAggTransReparent() takes care to set
798  * transValue to 0 when NULL. Otherwise we could end up accidentally not
799  * reparenting, when the transValue has the same numerical value as
800  * newValue, despite being NULL. This is a somewhat hot path, making it
801  * undesirable to instead solve this with another branch for the common
802  * case of the transition function returning its (modified) input
803  * argument.
804  */
805  if (!pertrans->transtypeByVal &&
806  DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
807  newVal = ExecAggTransReparent(aggstate, pertrans,
808  newVal, fcinfo->isnull,
809  pergroupstate->transValue,
810  pergroupstate->transValueIsNull);
811 
812  pergroupstate->transValue = newVal;
813  pergroupstate->transValueIsNull = fcinfo->isnull;
814 
815  MemoryContextSwitchTo(oldContext);
816 }
817 
818 /*
819  * Advance each aggregate transition state for one input tuple. The input
820  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
821  * accessible to ExecEvalExpr.
822  *
823  * We have two sets of transition states to handle: one for sorted aggregation
824  * and one for hashed; we do them both here, to avoid multiple evaluation of
825  * the inputs.
826  *
827  * When called, CurrentMemoryContext should be the per-query context.
828  */
829 static void
831 {
832  bool dummynull;
833 
835  aggstate->tmpcontext,
836  &dummynull);
837 }
838 
839 /*
840  * Run the transition function for a DISTINCT or ORDER BY aggregate
841  * with only one input. This is called after we have completed
842  * entering all the input values into the sort object. We complete the
843  * sort, read out the values in sorted order, and run the transition
844  * function on each value (applying DISTINCT if appropriate).
845  *
846  * Note that the strictness of the transition function was checked when
847  * entering the values into the sort, so we don't check it again here;
848  * we just apply standard SQL DISTINCT logic.
849  *
850  * The one-input case is handled separately from the multi-input case
851  * for performance reasons: for single by-value inputs, such as the
852  * common case of count(distinct id), the tuplesort_getdatum code path
853  * is around 300% faster. (The speedup for by-reference types is less
854  * but still noticeable.)
855  *
856  * This function handles only one grouping set (already set in
857  * aggstate->current_set).
858  *
859  * When called, CurrentMemoryContext should be the per-query context.
860  */
861 static void
863  AggStatePerTrans pertrans,
864  AggStatePerGroup pergroupstate)
865 {
866  Datum oldVal = (Datum) 0;
867  bool oldIsNull = true;
868  bool haveOldVal = false;
869  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
870  MemoryContext oldContext;
871  bool isDistinct = (pertrans->numDistinctCols > 0);
872  Datum newAbbrevVal = (Datum) 0;
873  Datum oldAbbrevVal = (Datum) 0;
874  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
875  Datum *newVal;
876  bool *isNull;
877 
878  Assert(pertrans->numDistinctCols < 2);
879 
880  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
881 
882  /* Load the column into argument 1 (arg 0 will be transition value) */
883  newVal = &fcinfo->args[1].value;
884  isNull = &fcinfo->args[1].isnull;
885 
886  /*
887  * Note: if input type is pass-by-ref, the datums returned by the sort are
888  * freshly palloc'd in the per-query context, so we must be careful to
889  * pfree them when they are no longer needed.
890  */
891 
892  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
893  true, newVal, isNull, &newAbbrevVal))
894  {
895  /*
896  * Clear and select the working context for evaluation of the equality
897  * function and transition function.
898  */
899  MemoryContextReset(workcontext);
900  oldContext = MemoryContextSwitchTo(workcontext);
901 
902  /*
903  * If DISTINCT mode, and not distinct from prior, skip it.
904  */
905  if (isDistinct &&
906  haveOldVal &&
907  ((oldIsNull && *isNull) ||
908  (!oldIsNull && !*isNull &&
909  oldAbbrevVal == newAbbrevVal &&
911  pertrans->aggCollation,
912  oldVal, *newVal)))))
913  {
914  /* equal to prior, so forget this one */
915  if (!pertrans->inputtypeByVal && !*isNull)
916  pfree(DatumGetPointer(*newVal));
917  }
918  else
919  {
920  advance_transition_function(aggstate, pertrans, pergroupstate);
921  /* forget the old value, if any */
922  if (!oldIsNull && !pertrans->inputtypeByVal)
923  pfree(DatumGetPointer(oldVal));
924  /* and remember the new one for subsequent equality checks */
925  oldVal = *newVal;
926  oldAbbrevVal = newAbbrevVal;
927  oldIsNull = *isNull;
928  haveOldVal = true;
929  }
930 
931  MemoryContextSwitchTo(oldContext);
932  }
933 
934  if (!oldIsNull && !pertrans->inputtypeByVal)
935  pfree(DatumGetPointer(oldVal));
936 
937  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
938  pertrans->sortstates[aggstate->current_set] = NULL;
939 }
940 
941 /*
942  * Run the transition function for a DISTINCT or ORDER BY aggregate
943  * with more than one input. This is called after we have completed
944  * entering all the input values into the sort object. We complete the
945  * sort, read out the values in sorted order, and run the transition
946  * function on each value (applying DISTINCT if appropriate).
947  *
948  * This function handles only one grouping set (already set in
949  * aggstate->current_set).
950  *
951  * When called, CurrentMemoryContext should be the per-query context.
952  */
953 static void
955  AggStatePerTrans pertrans,
956  AggStatePerGroup pergroupstate)
957 {
958  ExprContext *tmpcontext = aggstate->tmpcontext;
959  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
960  TupleTableSlot *slot1 = pertrans->sortslot;
961  TupleTableSlot *slot2 = pertrans->uniqslot;
962  int numTransInputs = pertrans->numTransInputs;
963  int numDistinctCols = pertrans->numDistinctCols;
964  Datum newAbbrevVal = (Datum) 0;
965  Datum oldAbbrevVal = (Datum) 0;
966  bool haveOldValue = false;
967  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
968  int i;
969 
970  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
971 
972  ExecClearTuple(slot1);
973  if (slot2)
974  ExecClearTuple(slot2);
975 
976  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
977  true, true, slot1, &newAbbrevVal))
978  {
980 
981  tmpcontext->ecxt_outertuple = slot1;
982  tmpcontext->ecxt_innertuple = slot2;
983 
984  if (numDistinctCols == 0 ||
985  !haveOldValue ||
986  newAbbrevVal != oldAbbrevVal ||
987  !ExecQual(pertrans->equalfnMulti, tmpcontext))
988  {
989  /*
990  * Extract the first numTransInputs columns as datums to pass to
991  * the transfn.
992  */
993  slot_getsomeattrs(slot1, numTransInputs);
994 
995  /* Load values into fcinfo */
996  /* Start from 1, since the 0th arg will be the transition value */
997  for (i = 0; i < numTransInputs; i++)
998  {
999  fcinfo->args[i + 1].value = slot1->tts_values[i];
1000  fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1001  }
1002 
1003  advance_transition_function(aggstate, pertrans, pergroupstate);
1004 
1005  if (numDistinctCols > 0)
1006  {
1007  /* swap the slot pointers to retain the current tuple */
1008  TupleTableSlot *tmpslot = slot2;
1009 
1010  slot2 = slot1;
1011  slot1 = tmpslot;
1012  /* avoid ExecQual() calls by reusing abbreviated keys */
1013  oldAbbrevVal = newAbbrevVal;
1014  haveOldValue = true;
1015  }
1016  }
1017 
1018  /* Reset context each time */
1019  ResetExprContext(tmpcontext);
1020 
1021  ExecClearTuple(slot1);
1022  }
1023 
1024  if (slot2)
1025  ExecClearTuple(slot2);
1026 
1027  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1028  pertrans->sortstates[aggstate->current_set] = NULL;
1029 
1030  /* restore previous slot, potentially in use for grouping sets */
1031  tmpcontext->ecxt_outertuple = save;
1032 }
1033 
1034 /*
1035  * Compute the final value of one aggregate.
1036  *
1037  * This function handles only one grouping set (already set in
1038  * aggstate->current_set).
1039  *
1040  * The finalfn will be run, and the result delivered, in the
1041  * output-tuple context; caller's CurrentMemoryContext does not matter.
1042  *
1043  * The finalfn uses the state as set in the transno. This also might be
1044  * being used by another aggregate function, so it's important that we do
1045  * nothing destructive here.
1046  */
1047 static void
1049  AggStatePerAgg peragg,
1050  AggStatePerGroup pergroupstate,
1051  Datum *resultVal, bool *resultIsNull)
1052 {
1053  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1054  bool anynull = false;
1055  MemoryContext oldContext;
1056  int i;
1057  ListCell *lc;
1058  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1059 
1061 
1062  /*
1063  * Evaluate any direct arguments. We do this even if there's no finalfn
1064  * (which is unlikely anyway), so that side-effects happen as expected.
1065  * The direct arguments go into arg positions 1 and up, leaving position 0
1066  * for the transition state value.
1067  */
1068  i = 1;
1069  foreach(lc, peragg->aggdirectargs)
1070  {
1071  ExprState *expr = (ExprState *) lfirst(lc);
1072 
1073  fcinfo->args[i].value = ExecEvalExpr(expr,
1074  aggstate->ss.ps.ps_ExprContext,
1075  &fcinfo->args[i].isnull);
1076  anynull |= fcinfo->args[i].isnull;
1077  i++;
1078  }
1079 
1080  /*
1081  * Apply the agg's finalfn if one is provided, else return transValue.
1082  */
1083  if (OidIsValid(peragg->finalfn_oid))
1084  {
1085  int numFinalArgs = peragg->numFinalArgs;
1086 
1087  /* set up aggstate->curperagg for AggGetAggref() */
1088  aggstate->curperagg = peragg;
1089 
1090  InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1091  numFinalArgs,
1092  pertrans->aggCollation,
1093  (void *) aggstate, NULL);
1094 
1095  /* Fill in the transition state value */
1096  fcinfo->args[0].value =
1097  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1098  pergroupstate->transValueIsNull,
1099  pertrans->transtypeLen);
1100  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1101  anynull |= pergroupstate->transValueIsNull;
1102 
1103  /* Fill any remaining argument positions with nulls */
1104  for (; i < numFinalArgs; i++)
1105  {
1106  fcinfo->args[i].value = (Datum) 0;
1107  fcinfo->args[i].isnull = true;
1108  anynull = true;
1109  }
1110 
1111  if (fcinfo->flinfo->fn_strict && anynull)
1112  {
1113  /* don't call a strict function with NULL inputs */
1114  *resultVal = (Datum) 0;
1115  *resultIsNull = true;
1116  }
1117  else
1118  {
1119  *resultVal = FunctionCallInvoke(fcinfo);
1120  *resultIsNull = fcinfo->isnull;
1121  }
1122  aggstate->curperagg = NULL;
1123  }
1124  else
1125  {
1126  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1127  *resultVal = pergroupstate->transValue;
1128  *resultIsNull = pergroupstate->transValueIsNull;
1129  }
1130 
1131  /*
1132  * If result is pass-by-ref, make sure it is in the right context.
1133  */
1134  if (!peragg->resulttypeByVal && !*resultIsNull &&
1136  DatumGetPointer(*resultVal)))
1137  *resultVal = datumCopy(*resultVal,
1138  peragg->resulttypeByVal,
1139  peragg->resulttypeLen);
1140 
1141  MemoryContextSwitchTo(oldContext);
1142 }
1143 
1144 /*
1145  * Compute the output value of one partial aggregate.
1146  *
1147  * The serialization function will be run, and the result delivered, in the
1148  * output-tuple context; caller's CurrentMemoryContext does not matter.
1149  */
1150 static void
1152  AggStatePerAgg peragg,
1153  AggStatePerGroup pergroupstate,
1154  Datum *resultVal, bool *resultIsNull)
1155 {
1156  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1157  MemoryContext oldContext;
1158 
1160 
1161  /*
1162  * serialfn_oid will be set if we must serialize the transvalue before
1163  * returning it
1164  */
1165  if (OidIsValid(pertrans->serialfn_oid))
1166  {
1167  /* Don't call a strict serialization function with NULL input. */
1168  if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1169  {
1170  *resultVal = (Datum) 0;
1171  *resultIsNull = true;
1172  }
1173  else
1174  {
1175  FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1176 
1177  fcinfo->args[0].value =
1178  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1179  pergroupstate->transValueIsNull,
1180  pertrans->transtypeLen);
1181  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1182  fcinfo->isnull = false;
1183 
1184  *resultVal = FunctionCallInvoke(fcinfo);
1185  *resultIsNull = fcinfo->isnull;
1186  }
1187  }
1188  else
1189  {
1190  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1191  *resultVal = pergroupstate->transValue;
1192  *resultIsNull = pergroupstate->transValueIsNull;
1193  }
1194 
1195  /* If result is pass-by-ref, make sure it is in the right context. */
1196  if (!peragg->resulttypeByVal && !*resultIsNull &&
1198  DatumGetPointer(*resultVal)))
1199  *resultVal = datumCopy(*resultVal,
1200  peragg->resulttypeByVal,
1201  peragg->resulttypeLen);
1202 
1203  MemoryContextSwitchTo(oldContext);
1204 }
1205 
1206 /*
1207  * Extract the attributes that make up the grouping key into the
1208  * hashslot. This is necessary to compute the hash or perform a lookup.
1209  */
1210 static void
1212 {
1213  TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1214  AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1215  TupleTableSlot *hashslot = perhash->hashslot;
1216  int i;
1217 
1218  /* transfer just the needed columns into hashslot */
1219  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1220  ExecClearTuple(hashslot);
1221 
1222  for (i = 0; i < perhash->numhashGrpCols; i++)
1223  {
1224  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1225 
1226  hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1227  hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1228  }
1229  ExecStoreVirtualTuple(hashslot);
1230 }
1231 
1232 /*
1233  * Prepare to finalize and project based on the specified representative tuple
1234  * slot and grouping set.
1235  *
1236  * In the specified tuple slot, force to null all attributes that should be
1237  * read as null in the context of the current grouping set. Also stash the
1238  * current group bitmap where GroupingExpr can get at it.
1239  *
1240  * This relies on three conditions:
1241  *
1242  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1243  * only reference it in evaluations, which will only access individual
1244  * attributes.
1245  *
1246  * 2) No system columns are going to need to be nulled. (If a system column is
1247  * referenced in a group clause, it is actually projected in the outer plan
1248  * tlist.)
1249  *
1250  * 3) Within a given phase, we never need to recover the value of an attribute
1251  * once it has been set to null.
1252  *
1253  * Poking into the slot this way is a bit ugly, but the consensus is that the
1254  * alternative was worse.
1255  */
1256 static void
1257 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1258 {
1259  if (aggstate->phase->grouped_cols)
1260  {
1261  Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1262 
1263  aggstate->grouped_cols = grouped_cols;
1264 
1265  if (TTS_EMPTY(slot))
1266  {
1267  /*
1268  * Force all values to be NULL if working on an empty input tuple
1269  * (i.e. an empty grouping set for which no input rows were
1270  * supplied).
1271  */
1272  ExecStoreAllNullTuple(slot);
1273  }
1274  else if (aggstate->all_grouped_cols)
1275  {
1276  ListCell *lc;
1277 
1278  /* all_grouped_cols is arranged in desc order */
1280 
1281  foreach(lc, aggstate->all_grouped_cols)
1282  {
1283  int attnum = lfirst_int(lc);
1284 
1285  if (!bms_is_member(attnum, grouped_cols))
1286  slot->tts_isnull[attnum - 1] = true;
1287  }
1288  }
1289  }
1290 }
1291 
1292 /*
1293  * Compute the final value of all aggregates for one group.
1294  *
1295  * This function handles only one grouping set at a time, which the caller must
1296  * have selected. It's also the caller's responsibility to adjust the supplied
1297  * pergroup parameter to point to the current set's transvalues.
1298  *
1299  * Results are stored in the output econtext aggvalues/aggnulls.
1300  */
1301 static void
1303  AggStatePerAgg peraggs,
1304  AggStatePerGroup pergroup)
1305 {
1306  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1307  Datum *aggvalues = econtext->ecxt_aggvalues;
1308  bool *aggnulls = econtext->ecxt_aggnulls;
1309  int aggno;
1310  int transno;
1311 
1312  /*
1313  * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1314  * inputs and run the transition functions.
1315  */
1316  for (transno = 0; transno < aggstate->numtrans; transno++)
1317  {
1318  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1319  AggStatePerGroup pergroupstate;
1320 
1321  pergroupstate = &pergroup[transno];
1322 
1323  if (pertrans->numSortCols > 0)
1324  {
1325  Assert(aggstate->aggstrategy != AGG_HASHED &&
1326  aggstate->aggstrategy != AGG_MIXED);
1327 
1328  if (pertrans->numInputs == 1)
1330  pertrans,
1331  pergroupstate);
1332  else
1334  pertrans,
1335  pergroupstate);
1336  }
1337  }
1338 
1339  /*
1340  * Run the final functions.
1341  */
1342  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1343  {
1344  AggStatePerAgg peragg = &peraggs[aggno];
1345  int transno = peragg->transno;
1346  AggStatePerGroup pergroupstate;
1347 
1348  pergroupstate = &pergroup[transno];
1349 
1350  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1351  finalize_partialaggregate(aggstate, peragg, pergroupstate,
1352  &aggvalues[aggno], &aggnulls[aggno]);
1353  else
1354  finalize_aggregate(aggstate, peragg, pergroupstate,
1355  &aggvalues[aggno], &aggnulls[aggno]);
1356  }
1357 }
1358 
1359 /*
1360  * Project the result of a group (whose aggs have already been calculated by
1361  * finalize_aggregates). Returns the result slot, or NULL if no row is
1362  * projected (suppressed by qual).
1363  */
1364 static TupleTableSlot *
1366 {
1367  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1368 
1369  /*
1370  * Check the qual (HAVING clause); if the group does not match, ignore it.
1371  */
1372  if (ExecQual(aggstate->ss.ps.qual, econtext))
1373  {
1374  /*
1375  * Form and return projection tuple using the aggregate results and
1376  * the representative input tuple.
1377  */
1378  return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1379  }
1380  else
1381  InstrCountFiltered1(aggstate, 1);
1382 
1383  return NULL;
1384 }
1385 
1386 /*
1387  * Walk tlist and qual to find referenced colnos, dividing them into
1388  * aggregated and unaggregated sets.
1389  */
1390 static void
1391 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1392 {
1393  Agg *agg = (Agg *) aggstate->ss.ps.plan;
1394  FindColsContext context;
1395 
1396  context.is_aggref = false;
1397  context.aggregated = NULL;
1398  context.unaggregated = NULL;
1399 
1400  (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1401  (void) find_cols_walker((Node *) agg->plan.qual, &context);
1402 
1403  *aggregated = context.aggregated;
1404  *unaggregated = context.unaggregated;
1405 }
1406 
1407 static bool
1409 {
1410  if (node == NULL)
1411  return false;
1412  if (IsA(node, Var))
1413  {
1414  Var *var = (Var *) node;
1415 
1416  /* setrefs.c should have set the varno to OUTER_VAR */
1417  Assert(var->varno == OUTER_VAR);
1418  Assert(var->varlevelsup == 0);
1419  if (context->is_aggref)
1420  context->aggregated = bms_add_member(context->aggregated,
1421  var->varattno);
1422  else
1423  context->unaggregated = bms_add_member(context->unaggregated,
1424  var->varattno);
1425  return false;
1426  }
1427  if (IsA(node, Aggref))
1428  {
1429  Assert(!context->is_aggref);
1430  context->is_aggref = true;
1431  expression_tree_walker(node, find_cols_walker, (void *) context);
1432  context->is_aggref = false;
1433  return false;
1434  }
1436  (void *) context);
1437 }
1438 
1439 /*
1440  * (Re-)initialize the hash table(s) to empty.
1441  *
1442  * To implement hashed aggregation, we need a hashtable that stores a
1443  * representative tuple and an array of AggStatePerGroup structs for each
1444  * distinct set of GROUP BY column values. We compute the hash key from the
1445  * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1446  * for each entry.
1447  *
1448  * We have a separate hashtable and associated perhash data structure for each
1449  * grouping set for which we're doing hashing.
1450  *
1451  * The contents of the hash tables always live in the hashcontext's per-tuple
1452  * memory context (there is only one of these for all tables together, since
1453  * they are all reset at the same time).
1454  */
1455 static void
1457 {
1458  int setno;
1459 
1460  for (setno = 0; setno < aggstate->num_hashes; ++setno)
1461  {
1462  AggStatePerHash perhash = &aggstate->perhash[setno];
1463  long nbuckets;
1464  Size memory;
1465 
1466  if (perhash->hashtable != NULL)
1467  {
1468  ResetTupleHashTable(perhash->hashtable);
1469  continue;
1470  }
1471 
1472  Assert(perhash->aggnode->numGroups > 0);
1473 
1474  memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1475 
1476  /* choose reasonable number of buckets per hashtable */
1477  nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1478  perhash->aggnode->numGroups,
1479  memory);
1480 
1481  build_hash_table(aggstate, setno, nbuckets);
1482  }
1483 
1484  aggstate->hash_ngroups_current = 0;
1485 }
1486 
1487 /*
1488  * Build a single hashtable for this grouping set.
1489  */
1490 static void
1491 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1492 {
1493  AggStatePerHash perhash = &aggstate->perhash[setno];
1494  MemoryContext metacxt = aggstate->hash_metacxt;
1495  MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1496  MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1497  Size additionalsize;
1498 
1499  Assert(aggstate->aggstrategy == AGG_HASHED ||
1500  aggstate->aggstrategy == AGG_MIXED);
1501 
1502  /*
1503  * Used to make sure initial hash table allocation does not exceed
1504  * work_mem. Note that the estimate does not include space for
1505  * pass-by-reference transition data values, nor for the representative
1506  * tuple of each group.
1507  */
1508  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1509 
1510  perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1511  perhash->hashslot->tts_tupleDescriptor,
1512  perhash->numCols,
1513  perhash->hashGrpColIdxHash,
1514  perhash->eqfuncoids,
1515  perhash->hashfunctions,
1516  perhash->aggnode->grpCollations,
1517  nbuckets,
1518  additionalsize,
1519  metacxt,
1520  hashcxt,
1521  tmpcxt,
1522  DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1523 }
1524 
1525 /*
1526  * Compute columns that actually need to be stored in hashtable entries. The
1527  * incoming tuples from the child plan node will contain grouping columns,
1528  * other columns referenced in our targetlist and qual, columns used to
1529  * compute the aggregate functions, and perhaps just junk columns we don't use
1530  * at all. Only columns of the first two types need to be stored in the
1531  * hashtable, and getting rid of the others can make the table entries
1532  * significantly smaller. The hashtable only contains the relevant columns,
1533  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1534  * into the format of the normal input descriptor.
1535  *
1536  * Additional columns, in addition to the columns grouped by, come from two
1537  * sources: Firstly functionally dependent columns that we don't need to group
1538  * by themselves, and secondly ctids for row-marks.
1539  *
1540  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1541  * then build an array of the columns included in the hashtable. We might
1542  * still have duplicates if the passed-in grpColIdx has them, which can happen
1543  * in edge cases from semijoins/distinct; these can't always be removed,
1544  * because it's not certain that the duplicate cols will be using the same
1545  * hash function.
1546  *
1547  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1548  * the per-query context (unlike the hash table itself).
1549  */
1550 static void
1552 {
1553  Bitmapset *base_colnos;
1554  Bitmapset *aggregated_colnos;
1555  TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1556  List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1557  int numHashes = aggstate->num_hashes;
1558  EState *estate = aggstate->ss.ps.state;
1559  int j;
1560 
1561  /* Find Vars that will be needed in tlist and qual */
1562  find_cols(aggstate, &aggregated_colnos, &base_colnos);
1563  aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1564  aggstate->max_colno_needed = 0;
1565  aggstate->all_cols_needed = true;
1566 
1567  for (int i = 0; i < scanDesc->natts; i++)
1568  {
1569  int colno = i + 1;
1570  if (bms_is_member(colno, aggstate->colnos_needed))
1571  aggstate->max_colno_needed = colno;
1572  else
1573  aggstate->all_cols_needed = false;
1574  }
1575 
1576  for (j = 0; j < numHashes; ++j)
1577  {
1578  AggStatePerHash perhash = &aggstate->perhash[j];
1579  Bitmapset *colnos = bms_copy(base_colnos);
1580  AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1581  List *hashTlist = NIL;
1582  TupleDesc hashDesc;
1583  int maxCols;
1584  int i;
1585 
1586  perhash->largestGrpColIdx = 0;
1587 
1588  /*
1589  * If we're doing grouping sets, then some Vars might be referenced in
1590  * tlist/qual for the benefit of other grouping sets, but not needed
1591  * when hashing; i.e. prepare_projection_slot will null them out, so
1592  * there'd be no point storing them. Use prepare_projection_slot's
1593  * logic to determine which.
1594  */
1595  if (aggstate->phases[0].grouped_cols)
1596  {
1597  Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1598  ListCell *lc;
1599 
1600  foreach(lc, aggstate->all_grouped_cols)
1601  {
1602  int attnum = lfirst_int(lc);
1603 
1604  if (!bms_is_member(attnum, grouped_cols))
1605  colnos = bms_del_member(colnos, attnum);
1606  }
1607  }
1608 
1609  /*
1610  * Compute maximum number of input columns accounting for possible
1611  * duplications in the grpColIdx array, which can happen in some edge
1612  * cases where HashAggregate was generated as part of a semijoin or a
1613  * DISTINCT.
1614  */
1615  maxCols = bms_num_members(colnos) + perhash->numCols;
1616 
1617  perhash->hashGrpColIdxInput =
1618  palloc(maxCols * sizeof(AttrNumber));
1619  perhash->hashGrpColIdxHash =
1620  palloc(perhash->numCols * sizeof(AttrNumber));
1621 
1622  /* Add all the grouping columns to colnos */
1623  for (i = 0; i < perhash->numCols; i++)
1624  colnos = bms_add_member(colnos, grpColIdx[i]);
1625 
1626  /*
1627  * First build mapping for columns directly hashed. These are the
1628  * first, because they'll be accessed when computing hash values and
1629  * comparing tuples for exact matches. We also build simple mapping
1630  * for execGrouping, so it knows where to find the to-be-hashed /
1631  * compared columns in the input.
1632  */
1633  for (i = 0; i < perhash->numCols; i++)
1634  {
1635  perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1636  perhash->hashGrpColIdxHash[i] = i + 1;
1637  perhash->numhashGrpCols++;
1638  /* delete already mapped columns */
1639  bms_del_member(colnos, grpColIdx[i]);
1640  }
1641 
1642  /* and add the remaining columns */
1643  while ((i = bms_first_member(colnos)) >= 0)
1644  {
1645  perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1646  perhash->numhashGrpCols++;
1647  }
1648 
1649  /* and build a tuple descriptor for the hashtable */
1650  for (i = 0; i < perhash->numhashGrpCols; i++)
1651  {
1652  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1653 
1654  hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1655  perhash->largestGrpColIdx =
1656  Max(varNumber + 1, perhash->largestGrpColIdx);
1657  }
1658 
1659  hashDesc = ExecTypeFromTL(hashTlist);
1660 
1661  execTuplesHashPrepare(perhash->numCols,
1662  perhash->aggnode->grpOperators,
1663  &perhash->eqfuncoids,
1664  &perhash->hashfunctions);
1665  perhash->hashslot =
1666  ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1668 
1669  list_free(hashTlist);
1670  bms_free(colnos);
1671  }
1672 
1673  bms_free(base_colnos);
1674 }
1675 
1676 /*
1677  * Estimate per-hash-table-entry overhead.
1678  */
1679 Size
1680 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1681 {
1682  Size tupleChunkSize;
1683  Size pergroupChunkSize;
1684  Size transitionChunkSize;
1685  Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1686  tupleWidth);
1687  Size pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1688 
1689  tupleChunkSize = CHUNKHDRSZ + tupleSize;
1690 
1691  if (pergroupSize > 0)
1692  pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1693  else
1694  pergroupChunkSize = 0;
1695 
1696  if (transitionSpace > 0)
1697  transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1698  else
1699  transitionChunkSize = 0;
1700 
1701  return
1702  sizeof(TupleHashEntryData) +
1703  tupleChunkSize +
1704  pergroupChunkSize +
1705  transitionChunkSize;
1706 }
1707 
1708 /*
1709  * hashagg_recompile_expressions()
1710  *
1711  * Identifies the right phase, compiles the right expression given the
1712  * arguments, and then sets phase->evalfunc to that expression.
1713  *
1714  * Different versions of the compiled expression are needed depending on
1715  * whether hash aggregation has spilled or not, and whether it's reading from
1716  * the outer plan or a tape. Before spilling to disk, the expression reads
1717  * from the outer plan and does not need to perform a NULL check. After
1718  * HashAgg begins to spill, new groups will not be created in the hash table,
1719  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1720  * pointer check to the expression. Then, when reading spilled data from a
1721  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1722  *
1723  * It would be wasteful to recompile every time, so cache the compiled
1724  * expressions in the AggStatePerPhase, and reuse when appropriate.
1725  */
1726 static void
1727 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1728 {
1729  AggStatePerPhase phase;
1730  int i = minslot ? 1 : 0;
1731  int j = nullcheck ? 1 : 0;
1732 
1733  Assert(aggstate->aggstrategy == AGG_HASHED ||
1734  aggstate->aggstrategy == AGG_MIXED);
1735 
1736  if (aggstate->aggstrategy == AGG_HASHED)
1737  phase = &aggstate->phases[0];
1738  else /* AGG_MIXED */
1739  phase = &aggstate->phases[1];
1740 
1741  if (phase->evaltrans_cache[i][j] == NULL)
1742  {
1743  const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1744  bool outerfixed = aggstate->ss.ps.outeropsfixed;
1745  bool dohash = true;
1746  bool dosort;
1747 
1748  dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
1749 
1750  /* temporarily change the outerops while compiling the expression */
1751  if (minslot)
1752  {
1753  aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1754  aggstate->ss.ps.outeropsfixed = true;
1755  }
1756 
1757  phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1758  dosort, dohash,
1759  nullcheck);
1760 
1761  /* change back */
1762  aggstate->ss.ps.outerops = outerops;
1763  aggstate->ss.ps.outeropsfixed = outerfixed;
1764  }
1765 
1766  phase->evaltrans = phase->evaltrans_cache[i][j];
1767 }
1768 
1769 /*
1770  * Set limits that trigger spilling to avoid exceeding work_mem. Consider the
1771  * number of partitions we expect to create (if we do spill).
1772  *
1773  * There are two limits: a memory limit, and also an ngroups limit. The
1774  * ngroups limit becomes important when we expect transition values to grow
1775  * substantially larger than the initial value.
1776  */
1777 void
1778 hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
1779  Size *mem_limit, uint64 *ngroups_limit,
1780  int *num_partitions)
1781 {
1782  int npartitions;
1783  Size partition_mem;
1784 
1785  /* if not expected to spill, use all of work_mem */
1786  if (input_groups * hashentrysize < work_mem * 1024L)
1787  {
1788  if (num_partitions != NULL)
1789  *num_partitions = 0;
1790  *mem_limit = work_mem * 1024L;
1791  *ngroups_limit = *mem_limit / hashentrysize;
1792  return;
1793  }
1794 
1795  /*
1796  * Calculate expected memory requirements for spilling, which is the size
1797  * of the buffers needed for all the tapes that need to be open at once.
1798  * Then, subtract that from the memory available for holding hash tables.
1799  */
1800  npartitions = hash_choose_num_partitions(input_groups,
1801  hashentrysize,
1802  used_bits,
1803  NULL);
1804  if (num_partitions != NULL)
1805  *num_partitions = npartitions;
1806 
1807  partition_mem =
1809  HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1810 
1811  /*
1812  * Don't set the limit below 3/4 of work_mem. In that case, we are at the
1813  * minimum number of partitions, so we aren't going to dramatically exceed
1814  * work mem anyway.
1815  */
1816  if (work_mem * 1024L > 4 * partition_mem)
1817  *mem_limit = work_mem * 1024L - partition_mem;
1818  else
1819  *mem_limit = work_mem * 1024L * 0.75;
1820 
1821  if (*mem_limit > hashentrysize)
1822  *ngroups_limit = *mem_limit / hashentrysize;
1823  else
1824  *ngroups_limit = 1;
1825 }
1826 
1827 /*
1828  * hash_agg_check_limits
1829  *
1830  * After adding a new group to the hash table, check whether we need to enter
1831  * spill mode. Allocations may happen without adding new groups (for instance,
1832  * if the transition state size grows), so this check is imperfect.
1833  */
1834 static void
1836 {
1837  uint64 ngroups = aggstate->hash_ngroups_current;
1838  Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1839  true);
1841  true);
1842 
1843  /*
1844  * Don't spill unless there's at least one group in the hash table so we
1845  * can be sure to make progress even in edge cases.
1846  */
1847  if (aggstate->hash_ngroups_current > 0 &&
1848  (meta_mem + hash_mem > aggstate->hash_mem_limit ||
1849  ngroups > aggstate->hash_ngroups_limit))
1850  {
1851  hash_agg_enter_spill_mode(aggstate);
1852  }
1853 }
1854 
1855 /*
1856  * Enter "spill mode", meaning that no new groups are added to any of the hash
1857  * tables. Tuples that would create a new group are instead spilled, and
1858  * processed later.
1859  */
1860 static void
1862 {
1863  aggstate->hash_spill_mode = true;
1864  hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1865 
1866  if (!aggstate->hash_ever_spilled)
1867  {
1868  Assert(aggstate->hash_tapeinfo == NULL);
1869  Assert(aggstate->hash_spills == NULL);
1870 
1871  aggstate->hash_ever_spilled = true;
1872 
1873  hashagg_tapeinfo_init(aggstate);
1874 
1875  aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1876 
1877  for (int setno = 0; setno < aggstate->num_hashes; setno++)
1878  {
1879  AggStatePerHash perhash = &aggstate->perhash[setno];
1880  HashAggSpill *spill = &aggstate->hash_spills[setno];
1881 
1882  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1883  perhash->aggnode->numGroups,
1884  aggstate->hashentrysize);
1885  }
1886  }
1887 }
1888 
1889 /*
1890  * Update metrics after filling the hash table.
1891  *
1892  * If reading from the outer plan, from_tape should be false; if reading from
1893  * another tape, from_tape should be true.
1894  */
1895 static void
1896 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1897 {
1898  Size meta_mem;
1899  Size hash_mem;
1900  Size buffer_mem;
1901  Size total_mem;
1902 
1903  if (aggstate->aggstrategy != AGG_MIXED &&
1904  aggstate->aggstrategy != AGG_HASHED)
1905  return;
1906 
1907  /* memory for the hash table itself */
1908  meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1909 
1910  /* memory for the group keys and transition states */
1911  hash_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1912 
1913  /* memory for read/write tape buffers, if spilled */
1914  buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1915  if (from_tape)
1916  buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1917 
1918  /* update peak mem */
1919  total_mem = meta_mem + hash_mem + buffer_mem;
1920  if (total_mem > aggstate->hash_mem_peak)
1921  aggstate->hash_mem_peak = total_mem;
1922 
1923  /* update disk usage */
1924  if (aggstate->hash_tapeinfo != NULL)
1925  {
1926  uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1927 
1928  if (aggstate->hash_disk_used < disk_used)
1929  aggstate->hash_disk_used = disk_used;
1930  }
1931 
1932  /* update hashentrysize estimate based on contents */
1933  if (aggstate->hash_ngroups_current > 0)
1934  {
1935  aggstate->hashentrysize =
1936  sizeof(TupleHashEntryData) +
1937  (hash_mem / (double) aggstate->hash_ngroups_current);
1938  }
1939 }
1940 
1941 /*
1942  * Choose a reasonable number of buckets for the initial hash table size.
1943  */
1944 static long
1945 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1946 {
1947  long max_nbuckets;
1948  long nbuckets = ngroups;
1949 
1950  max_nbuckets = memory / hashentrysize;
1951 
1952  /*
1953  * Underestimating is better than overestimating. Too many buckets crowd
1954  * out space for group keys and transition state values.
1955  */
1956  max_nbuckets >>= 1;
1957 
1958  if (nbuckets > max_nbuckets)
1959  nbuckets = max_nbuckets;
1960 
1961  return Max(nbuckets, 1);
1962 }
1963 
1964 /*
1965  * Determine the number of partitions to create when spilling, which will
1966  * always be a power of two. If log2_npartitions is non-NULL, set
1967  * *log2_npartitions to the log2() of the number of partitions.
1968  */
1969 static int
1970 hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
1971  int used_bits, int *log2_npartitions)
1972 {
1973  Size mem_wanted;
1974  int partition_limit;
1975  int npartitions;
1976  int partition_bits;
1977 
1978  /*
1979  * Avoid creating so many partitions that the memory requirements of the
1980  * open partition files are greater than 1/4 of work_mem.
1981  */
1982  partition_limit =
1983  (work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
1985 
1986  mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
1987 
1988  /* make enough partitions so that each one is likely to fit in memory */
1989  npartitions = 1 + (mem_wanted / (work_mem * 1024L));
1990 
1991  if (npartitions > partition_limit)
1992  npartitions = partition_limit;
1993 
1994  if (npartitions < HASHAGG_MIN_PARTITIONS)
1995  npartitions = HASHAGG_MIN_PARTITIONS;
1996  if (npartitions > HASHAGG_MAX_PARTITIONS)
1997  npartitions = HASHAGG_MAX_PARTITIONS;
1998 
1999  /* ceil(log2(npartitions)) */
2000  partition_bits = my_log2(npartitions);
2001 
2002  /* make sure that we don't exhaust the hash bits */
2003  if (partition_bits + used_bits >= 32)
2004  partition_bits = 32 - used_bits;
2005 
2006  if (log2_npartitions != NULL)
2007  *log2_npartitions = partition_bits;
2008 
2009  /* number of partitions will be a power of two */
2010  npartitions = 1L << partition_bits;
2011 
2012  return npartitions;
2013 }
2014 
2015 /*
2016  * Find or create a hashtable entry for the tuple group containing the current
2017  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
2018  * set (which the caller must have selected - note that initialize_aggregate
2019  * depends on this).
2020  *
2021  * When called, CurrentMemoryContext should be the per-query context. The
2022  * already-calculated hash value for the tuple must be specified.
2023  *
2024  * If in "spill mode", then only find existing hashtable entries; don't create
2025  * new ones. If a tuple's group is not already present in the hash table for
2026  * the current grouping set, assign *in_hash_table=false and the caller will
2027  * spill it to disk.
2028  */
2029 static AggStatePerGroup
2030 lookup_hash_entry(AggState *aggstate, uint32 hash, bool *in_hash_table)
2031 {
2032  AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
2033  TupleTableSlot *hashslot = perhash->hashslot;
2034  TupleHashEntryData *entry;
2035  bool isnew = false;
2036  bool *p_isnew;
2037 
2038  /* if hash table already spilled, don't create new entries */
2039  p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2040 
2041  /* find or create the hashtable entry using the filtered tuple */
2042  entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew,
2043  hash);
2044 
2045  if (entry == NULL)
2046  {
2047  *in_hash_table = false;
2048  return NULL;
2049  }
2050  else
2051  *in_hash_table = true;
2052 
2053  if (isnew)
2054  {
2055  AggStatePerGroup pergroup;
2056  int transno;
2057 
2058  aggstate->hash_ngroups_current++;
2059  hash_agg_check_limits(aggstate);
2060 
2061  /* no need to allocate or initialize per-group state */
2062  if (aggstate->numtrans == 0)
2063  return NULL;
2064 
2065  pergroup = (AggStatePerGroup)
2067  sizeof(AggStatePerGroupData) * aggstate->numtrans);
2068 
2069  entry->additional = pergroup;
2070 
2071  /*
2072  * Initialize aggregates for new tuple group, lookup_hash_entries()
2073  * already has selected the relevant grouping set.
2074  */
2075  for (transno = 0; transno < aggstate->numtrans; transno++)
2076  {
2077  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2078  AggStatePerGroup pergroupstate = &pergroup[transno];
2079 
2080  initialize_aggregate(aggstate, pertrans, pergroupstate);
2081  }
2082  }
2083 
2084  return entry->additional;
2085 }
2086 
2087 /*
2088  * Look up hash entries for the current tuple in all hashed grouping sets,
2089  * returning an array of pergroup pointers suitable for advance_aggregates.
2090  *
2091  * Be aware that lookup_hash_entry can reset the tmpcontext.
2092  *
2093  * Some entries may be left NULL if we are in "spill mode". The same tuple
2094  * will belong to different groups for each grouping set, so may match a group
2095  * already in memory for one set and match a group not in memory for another
2096  * set. When in "spill mode", the tuple will be spilled for each grouping set
2097  * where it doesn't match a group in memory.
2098  *
2099  * NB: It's possible to spill the same tuple for several different grouping
2100  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2101  * the tuple multiple times for multiple grouping sets, it can be partitioned
2102  * for each grouping set, making the refilling of the hash table very
2103  * efficient.
2104  */
2105 static void
2107 {
2108  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2109  int setno;
2110 
2111  for (setno = 0; setno < aggstate->num_hashes; setno++)
2112  {
2113  AggStatePerHash perhash = &aggstate->perhash[setno];
2114  uint32 hash;
2115  bool in_hash_table;
2116 
2117  select_current_set(aggstate, setno, true);
2118  prepare_hash_slot(aggstate);
2119  hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot);
2120  pergroup[setno] = lookup_hash_entry(aggstate, hash, &in_hash_table);
2121 
2122  /* check to see if we need to spill the tuple for this grouping set */
2123  if (!in_hash_table)
2124  {
2125  HashAggSpill *spill = &aggstate->hash_spills[setno];
2126  TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2127 
2128  if (spill->partitions == NULL)
2129  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2130  perhash->aggnode->numGroups,
2131  aggstate->hashentrysize);
2132 
2133  hashagg_spill_tuple(aggstate, spill, slot, hash);
2134  }
2135  }
2136 }
2137 
2138 /*
2139  * ExecAgg -
2140  *
2141  * ExecAgg receives tuples from its outer subplan and aggregates over
2142  * the appropriate attribute for each aggregate function use (Aggref
2143  * node) appearing in the targetlist or qual of the node. The number
2144  * of tuples to aggregate over depends on whether grouped or plain
2145  * aggregation is selected. In grouped aggregation, we produce a result
2146  * row for each group; in plain aggregation there's a single result row
2147  * for the whole query. In either case, the value of each aggregate is
2148  * stored in the expression context to be used when ExecProject evaluates
2149  * the result tuple.
2150  */
2151 static TupleTableSlot *
2153 {
2154  AggState *node = castNode(AggState, pstate);
2155  TupleTableSlot *result = NULL;
2156 
2158 
2159  if (!node->agg_done)
2160  {
2161  /* Dispatch based on strategy */
2162  switch (node->phase->aggstrategy)
2163  {
2164  case AGG_HASHED:
2165  if (!node->table_filled)
2166  agg_fill_hash_table(node);
2167  /* FALLTHROUGH */
2168  case AGG_MIXED:
2169  result = agg_retrieve_hash_table(node);
2170  break;
2171  case AGG_PLAIN:
2172  case AGG_SORTED:
2173  result = agg_retrieve_direct(node);
2174  break;
2175  }
2176 
2177  if (!TupIsNull(result))
2178  return result;
2179  }
2180 
2181  return NULL;
2182 }
2183 
2184 /*
2185  * ExecAgg for non-hashed case
2186  */
2187 static TupleTableSlot *
2189 {
2190  Agg *node = aggstate->phase->aggnode;
2191  ExprContext *econtext;
2192  ExprContext *tmpcontext;
2193  AggStatePerAgg peragg;
2194  AggStatePerGroup *pergroups;
2195  TupleTableSlot *outerslot;
2196  TupleTableSlot *firstSlot;
2197  TupleTableSlot *result;
2198  bool hasGroupingSets = aggstate->phase->numsets > 0;
2199  int numGroupingSets = Max(aggstate->phase->numsets, 1);
2200  int currentSet;
2201  int nextSetSize;
2202  int numReset;
2203  int i;
2204 
2205  /*
2206  * get state info from node
2207  *
2208  * econtext is the per-output-tuple expression context
2209  *
2210  * tmpcontext is the per-input-tuple expression context
2211  */
2212  econtext = aggstate->ss.ps.ps_ExprContext;
2213  tmpcontext = aggstate->tmpcontext;
2214 
2215  peragg = aggstate->peragg;
2216  pergroups = aggstate->pergroups;
2217  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2218 
2219  /*
2220  * We loop retrieving groups until we find one matching
2221  * aggstate->ss.ps.qual
2222  *
2223  * For grouping sets, we have the invariant that aggstate->projected_set
2224  * is either -1 (initial call) or the index (starting from 0) in
2225  * gset_lengths for the group we just completed (either by projecting a
2226  * row or by discarding it in the qual).
2227  */
2228  while (!aggstate->agg_done)
2229  {
2230  /*
2231  * Clear the per-output-tuple context for each group, as well as
2232  * aggcontext (which contains any pass-by-ref transvalues of the old
2233  * group). Some aggregate functions store working state in child
2234  * contexts; those now get reset automatically without us needing to
2235  * do anything special.
2236  *
2237  * We use ReScanExprContext not just ResetExprContext because we want
2238  * any registered shutdown callbacks to be called. That allows
2239  * aggregate functions to ensure they've cleaned up any non-memory
2240  * resources.
2241  */
2242  ReScanExprContext(econtext);
2243 
2244  /*
2245  * Determine how many grouping sets need to be reset at this boundary.
2246  */
2247  if (aggstate->projected_set >= 0 &&
2248  aggstate->projected_set < numGroupingSets)
2249  numReset = aggstate->projected_set + 1;
2250  else
2251  numReset = numGroupingSets;
2252 
2253  /*
2254  * numReset can change on a phase boundary, but that's OK; we want to
2255  * reset the contexts used in _this_ phase, and later, after possibly
2256  * changing phase, initialize the right number of aggregates for the
2257  * _new_ phase.
2258  */
2259 
2260  for (i = 0; i < numReset; i++)
2261  {
2262  ReScanExprContext(aggstate->aggcontexts[i]);
2263  }
2264 
2265  /*
2266  * Check if input is complete and there are no more groups to project
2267  * in this phase; move to next phase or mark as done.
2268  */
2269  if (aggstate->input_done == true &&
2270  aggstate->projected_set >= (numGroupingSets - 1))
2271  {
2272  if (aggstate->current_phase < aggstate->numphases - 1)
2273  {
2274  initialize_phase(aggstate, aggstate->current_phase + 1);
2275  aggstate->input_done = false;
2276  aggstate->projected_set = -1;
2277  numGroupingSets = Max(aggstate->phase->numsets, 1);
2278  node = aggstate->phase->aggnode;
2279  numReset = numGroupingSets;
2280  }
2281  else if (aggstate->aggstrategy == AGG_MIXED)
2282  {
2283  /*
2284  * Mixed mode; we've output all the grouped stuff and have
2285  * full hashtables, so switch to outputting those.
2286  */
2287  initialize_phase(aggstate, 0);
2288  aggstate->table_filled = true;
2290  &aggstate->perhash[0].hashiter);
2291  select_current_set(aggstate, 0, true);
2292  return agg_retrieve_hash_table(aggstate);
2293  }
2294  else
2295  {
2296  aggstate->agg_done = true;
2297  break;
2298  }
2299  }
2300 
2301  /*
2302  * Get the number of columns in the next grouping set after the last
2303  * projected one (if any). This is the number of columns to compare to
2304  * see if we reached the boundary of that set too.
2305  */
2306  if (aggstate->projected_set >= 0 &&
2307  aggstate->projected_set < (numGroupingSets - 1))
2308  nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2309  else
2310  nextSetSize = 0;
2311 
2312  /*----------
2313  * If a subgroup for the current grouping set is present, project it.
2314  *
2315  * We have a new group if:
2316  * - we're out of input but haven't projected all grouping sets
2317  * (checked above)
2318  * OR
2319  * - we already projected a row that wasn't from the last grouping
2320  * set
2321  * AND
2322  * - the next grouping set has at least one grouping column (since
2323  * empty grouping sets project only once input is exhausted)
2324  * AND
2325  * - the previous and pending rows differ on the grouping columns
2326  * of the next grouping set
2327  *----------
2328  */
2329  tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2330  if (aggstate->input_done ||
2331  (node->aggstrategy != AGG_PLAIN &&
2332  aggstate->projected_set != -1 &&
2333  aggstate->projected_set < (numGroupingSets - 1) &&
2334  nextSetSize > 0 &&
2335  !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2336  tmpcontext)))
2337  {
2338  aggstate->projected_set += 1;
2339 
2340  Assert(aggstate->projected_set < numGroupingSets);
2341  Assert(nextSetSize > 0 || aggstate->input_done);
2342  }
2343  else
2344  {
2345  /*
2346  * We no longer care what group we just projected, the next
2347  * projection will always be the first (or only) grouping set
2348  * (unless the input proves to be empty).
2349  */
2350  aggstate->projected_set = 0;
2351 
2352  /*
2353  * If we don't already have the first tuple of the new group,
2354  * fetch it from the outer plan.
2355  */
2356  if (aggstate->grp_firstTuple == NULL)
2357  {
2358  outerslot = fetch_input_tuple(aggstate);
2359  if (!TupIsNull(outerslot))
2360  {
2361  /*
2362  * Make a copy of the first input tuple; we will use this
2363  * for comparisons (in group mode) and for projection.
2364  */
2365  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2366  }
2367  else
2368  {
2369  /* outer plan produced no tuples at all */
2370  if (hasGroupingSets)
2371  {
2372  /*
2373  * If there was no input at all, we need to project
2374  * rows only if there are grouping sets of size 0.
2375  * Note that this implies that there can't be any
2376  * references to ungrouped Vars, which would otherwise
2377  * cause issues with the empty output slot.
2378  *
2379  * XXX: This is no longer true, we currently deal with
2380  * this in finalize_aggregates().
2381  */
2382  aggstate->input_done = true;
2383 
2384  while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2385  {
2386  aggstate->projected_set += 1;
2387  if (aggstate->projected_set >= numGroupingSets)
2388  {
2389  /*
2390  * We can't set agg_done here because we might
2391  * have more phases to do, even though the
2392  * input is empty. So we need to restart the
2393  * whole outer loop.
2394  */
2395  break;
2396  }
2397  }
2398 
2399  if (aggstate->projected_set >= numGroupingSets)
2400  continue;
2401  }
2402  else
2403  {
2404  aggstate->agg_done = true;
2405  /* If we are grouping, we should produce no tuples too */
2406  if (node->aggstrategy != AGG_PLAIN)
2407  return NULL;
2408  }
2409  }
2410  }
2411 
2412  /*
2413  * Initialize working state for a new input tuple group.
2414  */
2415  initialize_aggregates(aggstate, pergroups, numReset);
2416 
2417  if (aggstate->grp_firstTuple != NULL)
2418  {
2419  /*
2420  * Store the copied first input tuple in the tuple table slot
2421  * reserved for it. The tuple will be deleted when it is
2422  * cleared from the slot.
2423  */
2425  firstSlot, true);
2426  aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2427 
2428  /* set up for first advance_aggregates call */
2429  tmpcontext->ecxt_outertuple = firstSlot;
2430 
2431  /*
2432  * Process each outer-plan tuple, and then fetch the next one,
2433  * until we exhaust the outer plan or cross a group boundary.
2434  */
2435  for (;;)
2436  {
2437  /*
2438  * During phase 1 only of a mixed agg, we need to update
2439  * hashtables as well in advance_aggregates.
2440  */
2441  if (aggstate->aggstrategy == AGG_MIXED &&
2442  aggstate->current_phase == 1)
2443  {
2444  lookup_hash_entries(aggstate);
2445  }
2446 
2447  /* Advance the aggregates (or combine functions) */
2448  advance_aggregates(aggstate);
2449 
2450  /* Reset per-input-tuple context after each tuple */
2451  ResetExprContext(tmpcontext);
2452 
2453  outerslot = fetch_input_tuple(aggstate);
2454  if (TupIsNull(outerslot))
2455  {
2456  /* no more outer-plan tuples available */
2457 
2458  /* if we built hash tables, finalize any spills */
2459  if (aggstate->aggstrategy == AGG_MIXED &&
2460  aggstate->current_phase == 1)
2462 
2463  if (hasGroupingSets)
2464  {
2465  aggstate->input_done = true;
2466  break;
2467  }
2468  else
2469  {
2470  aggstate->agg_done = true;
2471  break;
2472  }
2473  }
2474  /* set up for next advance_aggregates call */
2475  tmpcontext->ecxt_outertuple = outerslot;
2476 
2477  /*
2478  * If we are grouping, check whether we've crossed a group
2479  * boundary.
2480  */
2481  if (node->aggstrategy != AGG_PLAIN)
2482  {
2483  tmpcontext->ecxt_innertuple = firstSlot;
2484  if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2485  tmpcontext))
2486  {
2487  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2488  break;
2489  }
2490  }
2491  }
2492  }
2493 
2494  /*
2495  * Use the representative input tuple for any references to
2496  * non-aggregated input columns in aggregate direct args, the node
2497  * qual, and the tlist. (If we are not grouping, and there are no
2498  * input rows at all, we will come here with an empty firstSlot
2499  * ... but if not grouping, there can't be any references to
2500  * non-aggregated input columns, so no problem.)
2501  */
2502  econtext->ecxt_outertuple = firstSlot;
2503  }
2504 
2505  Assert(aggstate->projected_set >= 0);
2506 
2507  currentSet = aggstate->projected_set;
2508 
2509  prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2510 
2511  select_current_set(aggstate, currentSet, false);
2512 
2513  finalize_aggregates(aggstate,
2514  peragg,
2515  pergroups[currentSet]);
2516 
2517  /*
2518  * If there's no row to project right now, we must continue rather
2519  * than returning a null since there might be more groups.
2520  */
2521  result = project_aggregates(aggstate);
2522  if (result)
2523  return result;
2524  }
2525 
2526  /* No more groups */
2527  return NULL;
2528 }
2529 
2530 /*
2531  * ExecAgg for hashed case: read input and build hash table
2532  */
2533 static void
2535 {
2536  TupleTableSlot *outerslot;
2537  ExprContext *tmpcontext = aggstate->tmpcontext;
2538 
2539  /*
2540  * Process each outer-plan tuple, and then fetch the next one, until we
2541  * exhaust the outer plan.
2542  */
2543  for (;;)
2544  {
2545  outerslot = fetch_input_tuple(aggstate);
2546  if (TupIsNull(outerslot))
2547  break;
2548 
2549  /* set up for lookup_hash_entries and advance_aggregates */
2550  tmpcontext->ecxt_outertuple = outerslot;
2551 
2552  /* Find or build hashtable entries */
2553  lookup_hash_entries(aggstate);
2554 
2555  /* Advance the aggregates (or combine functions) */
2556  advance_aggregates(aggstate);
2557 
2558  /*
2559  * Reset per-input-tuple context after each tuple, but note that the
2560  * hash lookups do this too
2561  */
2562  ResetExprContext(aggstate->tmpcontext);
2563  }
2564 
2565  /* finalize spills, if any */
2567 
2568  aggstate->table_filled = true;
2569  /* Initialize to walk the first hash table */
2570  select_current_set(aggstate, 0, true);
2572  &aggstate->perhash[0].hashiter);
2573 }
2574 
2575 /*
2576  * If any data was spilled during hash aggregation, reset the hash table and
2577  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2578  * table will again contain data, ready to be consumed by
2579  * agg_retrieve_hash_table_in_memory().
2580  *
2581  * Should only be called after all in memory hash table entries have been
2582  * finalized and emitted.
2583  *
2584  * Return false when input is exhausted and there's no more work to be done;
2585  * otherwise return true.
2586  */
2587 static bool
2589 {
2590  HashAggBatch *batch;
2591  HashAggSpill spill;
2592  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2593  uint64 ngroups_estimate;
2594  bool spill_initialized = false;
2595 
2596  if (aggstate->hash_batches == NIL)
2597  return false;
2598 
2599  batch = linitial(aggstate->hash_batches);
2600  aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
2601 
2602  /*
2603  * Estimate the number of groups for this batch as the total number of
2604  * tuples in its input file. Although that's a worst case, it's not bad
2605  * here for two reasons: (1) overestimating is better than
2606  * underestimating; and (2) we've already scanned the relation once, so
2607  * it's likely that we've already finalized many of the common values.
2608  */
2609  ngroups_estimate = batch->input_tuples;
2610 
2611  hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
2612  batch->used_bits, &aggstate->hash_mem_limit,
2613  &aggstate->hash_ngroups_limit, NULL);
2614 
2615  /* there could be residual pergroup pointers; clear them */
2616  for (int setoff = 0;
2617  setoff < aggstate->maxsets + aggstate->num_hashes;
2618  setoff++)
2619  aggstate->all_pergroups[setoff] = NULL;
2620 
2621  /* free memory and reset hash tables */
2622  ReScanExprContext(aggstate->hashcontext);
2623  for (int setno = 0; setno < aggstate->num_hashes; setno++)
2624  ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2625 
2626  aggstate->hash_ngroups_current = 0;
2627 
2628  /*
2629  * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2630  * happens in phase 0. So, we switch to phase 1 when processing a batch,
2631  * and back to phase 0 after the batch is done.
2632  */
2633  Assert(aggstate->current_phase == 0);
2634  if (aggstate->phase->aggstrategy == AGG_MIXED)
2635  {
2636  aggstate->current_phase = 1;
2637  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2638  }
2639 
2640  select_current_set(aggstate, batch->setno, true);
2641 
2642  /*
2643  * Spilled tuples are always read back as MinimalTuples, which may be
2644  * different from the outer plan, so recompile the aggregate expressions.
2645  *
2646  * We still need the NULL check, because we are only processing one
2647  * grouping set at a time and the rest will be NULL.
2648  */
2649  hashagg_recompile_expressions(aggstate, true, true);
2650 
2653  for (;;)
2654  {
2655  TupleTableSlot *slot = aggstate->hash_spill_rslot;
2656  MinimalTuple tuple;
2657  uint32 hash;
2658  bool in_hash_table;
2659 
2661 
2662  tuple = hashagg_batch_read(batch, &hash);
2663  if (tuple == NULL)
2664  break;
2665 
2666  ExecStoreMinimalTuple(tuple, slot, true);
2667  aggstate->tmpcontext->ecxt_outertuple = slot;
2668 
2669  prepare_hash_slot(aggstate);
2670  aggstate->hash_pergroup[batch->setno] =
2671  lookup_hash_entry(aggstate, hash, &in_hash_table);
2672 
2673  if (in_hash_table)
2674  {
2675  /* Advance the aggregates (or combine functions) */
2676  advance_aggregates(aggstate);
2677  }
2678  else
2679  {
2680  if (!spill_initialized)
2681  {
2682  /*
2683  * Avoid initializing the spill until we actually need it so
2684  * that we don't assign tapes that will never be used.
2685  */
2686  spill_initialized = true;
2687  hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2688  ngroups_estimate, aggstate->hashentrysize);
2689  }
2690  /* no memory for a new group, spill */
2691  hashagg_spill_tuple(aggstate, &spill, slot, hash);
2692  }
2693 
2694  /*
2695  * Reset per-input-tuple context after each tuple, but note that the
2696  * hash lookups do this too
2697  */
2698  ResetExprContext(aggstate->tmpcontext);
2699  }
2700 
2701  hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2702 
2703  /* change back to phase 0 */
2704  aggstate->current_phase = 0;
2705  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2706 
2707  if (spill_initialized)
2708  {
2709  hash_agg_update_metrics(aggstate, true, spill.npartitions);
2710  hashagg_spill_finish(aggstate, &spill, batch->setno);
2711  }
2712  else
2713  hash_agg_update_metrics(aggstate, true, 0);
2714 
2715  aggstate->hash_spill_mode = false;
2716 
2717  /* prepare to walk the first hash table */
2718  select_current_set(aggstate, batch->setno, true);
2719  ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2720  &aggstate->perhash[batch->setno].hashiter);
2721 
2722  pfree(batch);
2723 
2724  return true;
2725 }
2726 
2727 /*
2728  * ExecAgg for hashed case: retrieving groups from hash table
2729  *
2730  * After exhausting in-memory tuples, also try refilling the hash table using
2731  * previously-spilled tuples. Only returns NULL after all in-memory and
2732  * spilled tuples are exhausted.
2733  */
2734 static TupleTableSlot *
2736 {
2737  TupleTableSlot *result = NULL;
2738 
2739  while (result == NULL)
2740  {
2741  result = agg_retrieve_hash_table_in_memory(aggstate);
2742  if (result == NULL)
2743  {
2744  if (!agg_refill_hash_table(aggstate))
2745  {
2746  aggstate->agg_done = true;
2747  break;
2748  }
2749  }
2750  }
2751 
2752  return result;
2753 }
2754 
2755 /*
2756  * Retrieve the groups from the in-memory hash tables without considering any
2757  * spilled tuples.
2758  */
2759 static TupleTableSlot *
2761 {
2762  ExprContext *econtext;
2763  AggStatePerAgg peragg;
2764  AggStatePerGroup pergroup;
2765  TupleHashEntryData *entry;
2766  TupleTableSlot *firstSlot;
2767  TupleTableSlot *result;
2768  AggStatePerHash perhash;
2769 
2770  /*
2771  * get state info from node.
2772  *
2773  * econtext is the per-output-tuple expression context.
2774  */
2775  econtext = aggstate->ss.ps.ps_ExprContext;
2776  peragg = aggstate->peragg;
2777  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2778 
2779  /*
2780  * Note that perhash (and therefore anything accessed through it) can
2781  * change inside the loop, as we change between grouping sets.
2782  */
2783  perhash = &aggstate->perhash[aggstate->current_set];
2784 
2785  /*
2786  * We loop retrieving groups until we find one satisfying
2787  * aggstate->ss.ps.qual
2788  */
2789  for (;;)
2790  {
2791  TupleTableSlot *hashslot = perhash->hashslot;
2792  int i;
2793 
2795 
2796  /*
2797  * Find the next entry in the hash table
2798  */
2799  entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2800  if (entry == NULL)
2801  {
2802  int nextset = aggstate->current_set + 1;
2803 
2804  if (nextset < aggstate->num_hashes)
2805  {
2806  /*
2807  * Switch to next grouping set, reinitialize, and restart the
2808  * loop.
2809  */
2810  select_current_set(aggstate, nextset, true);
2811 
2812  perhash = &aggstate->perhash[aggstate->current_set];
2813 
2814  ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2815 
2816  continue;
2817  }
2818  else
2819  {
2820  return NULL;
2821  }
2822  }
2823 
2824  /*
2825  * Clear the per-output-tuple context for each group
2826  *
2827  * We intentionally don't use ReScanExprContext here; if any aggs have
2828  * registered shutdown callbacks, they mustn't be called yet, since we
2829  * might not be done with that agg.
2830  */
2831  ResetExprContext(econtext);
2832 
2833  /*
2834  * Transform representative tuple back into one with the right
2835  * columns.
2836  */
2837  ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2838  slot_getallattrs(hashslot);
2839 
2840  ExecClearTuple(firstSlot);
2841  memset(firstSlot->tts_isnull, true,
2842  firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2843 
2844  for (i = 0; i < perhash->numhashGrpCols; i++)
2845  {
2846  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2847 
2848  firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2849  firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2850  }
2851  ExecStoreVirtualTuple(firstSlot);
2852 
2853  pergroup = (AggStatePerGroup) entry->additional;
2854 
2855  /*
2856  * Use the representative input tuple for any references to
2857  * non-aggregated input columns in the qual and tlist.
2858  */
2859  econtext->ecxt_outertuple = firstSlot;
2860 
2861  prepare_projection_slot(aggstate,
2862  econtext->ecxt_outertuple,
2863  aggstate->current_set);
2864 
2865  finalize_aggregates(aggstate, peragg, pergroup);
2866 
2867  result = project_aggregates(aggstate);
2868  if (result)
2869  return result;
2870  }
2871 
2872  /* No more groups */
2873  return NULL;
2874 }
2875 
2876 /*
2877  * Initialize HashTapeInfo
2878  */
2879 static void
2881 {
2882  HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2883  int init_tapes = 16; /* expanded dynamically */
2884 
2885  tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, NULL, NULL, -1);
2886  tapeinfo->ntapes = init_tapes;
2887  tapeinfo->nfreetapes = init_tapes;
2888  tapeinfo->freetapes_alloc = init_tapes;
2889  tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2890  for (int i = 0; i < init_tapes; i++)
2891  tapeinfo->freetapes[i] = i;
2892 
2893  aggstate->hash_tapeinfo = tapeinfo;
2894 }
2895 
2896 /*
2897  * Assign unused tapes to spill partitions, extending the tape set if
2898  * necessary.
2899  */
2900 static void
2902  int npartitions)
2903 {
2904  int partidx = 0;
2905 
2906  /* use free tapes if available */
2907  while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2908  partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2909 
2910  if (partidx < npartitions)
2911  {
2912  LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2913 
2914  while (partidx < npartitions)
2915  partitions[partidx++] = tapeinfo->ntapes++;
2916  }
2917 }
2918 
2919 /*
2920  * After a tape has already been written to and then read, this function
2921  * rewinds it for writing and adds it to the free list.
2922  */
2923 static void
2925 {
2926  LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2927  if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2928  {
2929  tapeinfo->freetapes_alloc <<= 1;
2930  tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
2931  tapeinfo->freetapes_alloc * sizeof(int));
2932  }
2933  tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2934 }
2935 
2936 /*
2937  * hashagg_spill_init
2938  *
2939  * Called after we determined that spilling is necessary. Chooses the number
2940  * of partitions to create, and initializes them.
2941  */
2942 static void
2943 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2944  uint64 input_groups, double hashentrysize)
2945 {
2946  int npartitions;
2947  int partition_bits;
2948 
2949  npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2950  used_bits, &partition_bits);
2951 
2952  spill->partitions = palloc0(sizeof(int) * npartitions);
2953  spill->ntuples = palloc0(sizeof(int64) * npartitions);
2954 
2955  hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2956 
2957  spill->tapeset = tapeinfo->tapeset;
2958  spill->shift = 32 - used_bits - partition_bits;
2959  spill->mask = (npartitions - 1) << spill->shift;
2960  spill->npartitions = npartitions;
2961 }
2962 
2963 /*
2964  * hashagg_spill_tuple
2965  *
2966  * No room for new groups in the hash table. Save for later in the appropriate
2967  * partition.
2968  */
2969 static Size
2971  TupleTableSlot *inputslot, uint32 hash)
2972 {
2973  LogicalTapeSet *tapeset = spill->tapeset;
2974  TupleTableSlot *spillslot;
2975  int partition;
2976  MinimalTuple tuple;
2977  int tapenum;
2978  int total_written = 0;
2979  bool shouldFree;
2980 
2981  Assert(spill->partitions != NULL);
2982 
2983  /* spill only attributes that we actually need */
2984  if (!aggstate->all_cols_needed)
2985  {
2986  spillslot = aggstate->hash_spill_wslot;
2987  slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
2988  ExecClearTuple(spillslot);
2989  for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
2990  {
2991  if (bms_is_member(i + 1, aggstate->colnos_needed))
2992  {
2993  spillslot->tts_values[i] = inputslot->tts_values[i];
2994  spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
2995  }
2996  else
2997  spillslot->tts_isnull[i] = true;
2998  }
2999  ExecStoreVirtualTuple(spillslot);
3000  }
3001  else
3002  spillslot = inputslot;
3003 
3004  tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
3005 
3006  partition = (hash & spill->mask) >> spill->shift;
3007  spill->ntuples[partition]++;
3008 
3009  tapenum = spill->partitions[partition];
3010 
3011  LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
3012  total_written += sizeof(uint32);
3013 
3014  LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
3015  total_written += tuple->t_len;
3016 
3017  if (shouldFree)
3018  pfree(tuple);
3019 
3020  return total_written;
3021 }
3022 
3023 /*
3024  * hashagg_batch_new
3025  *
3026  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3027  * be done.
3028  */
3029 static HashAggBatch *
3030 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3031  int64 input_tuples, int used_bits)
3032 {
3033  HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
3034 
3035  batch->setno = setno;
3036  batch->used_bits = used_bits;
3037  batch->tapeset = tapeset;
3038  batch->input_tapenum = tapenum;
3039  batch->input_tuples = input_tuples;
3040 
3041  return batch;
3042 }
3043 
3044 /*
3045  * read_spilled_tuple
3046  * read the next tuple from a batch's tape. Return NULL if no more.
3047  */
3048 static MinimalTuple
3050 {
3051  LogicalTapeSet *tapeset = batch->tapeset;
3052  int tapenum = batch->input_tapenum;
3053  MinimalTuple tuple;
3054  uint32 t_len;
3055  size_t nread;
3056  uint32 hash;
3057 
3058  nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
3059  if (nread == 0)
3060  return NULL;
3061  if (nread != sizeof(uint32))
3062  ereport(ERROR,
3064  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3065  tapenum, sizeof(uint32), nread)));
3066  if (hashp != NULL)
3067  *hashp = hash;
3068 
3069  nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
3070  if (nread != sizeof(uint32))
3071  ereport(ERROR,
3073  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3074  tapenum, sizeof(uint32), nread)));
3075 
3076  tuple = (MinimalTuple) palloc(t_len);
3077  tuple->t_len = t_len;
3078 
3079  nread = LogicalTapeRead(tapeset, tapenum,
3080  (void *) ((char *) tuple + sizeof(uint32)),
3081  t_len - sizeof(uint32));
3082  if (nread != t_len - sizeof(uint32))
3083  ereport(ERROR,
3085  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3086  tapenum, t_len - sizeof(uint32), nread)));
3087 
3088  return tuple;
3089 }
3090 
3091 /*
3092  * hashagg_finish_initial_spills
3093  *
3094  * After a HashAggBatch has been processed, it may have spilled tuples to
3095  * disk. If so, turn the spilled partitions into new batches that must later
3096  * be executed.
3097  */
3098 static void
3100 {
3101  int setno;
3102  int total_npartitions = 0;
3103 
3104  if (aggstate->hash_spills != NULL)
3105  {
3106  for (setno = 0; setno < aggstate->num_hashes; setno++)
3107  {
3108  HashAggSpill *spill = &aggstate->hash_spills[setno];
3109 
3110  total_npartitions += spill->npartitions;
3111  hashagg_spill_finish(aggstate, spill, setno);
3112  }
3113 
3114  /*
3115  * We're not processing tuples from outer plan any more; only
3116  * processing batches of spilled tuples. The initial spill structures
3117  * are no longer needed.
3118  */
3119  pfree(aggstate->hash_spills);
3120  aggstate->hash_spills = NULL;
3121  }
3122 
3123  hash_agg_update_metrics(aggstate, false, total_npartitions);
3124  aggstate->hash_spill_mode = false;
3125 }
3126 
3127 /*
3128  * hashagg_spill_finish
3129  *
3130  * Transform spill partitions into new batches.
3131  */
3132 static void
3133 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3134 {
3135  int i;
3136  int used_bits = 32 - spill->shift;
3137 
3138  if (spill->npartitions == 0)
3139  return; /* didn't spill */
3140 
3141  for (i = 0; i < spill->npartitions; i++)
3142  {
3143  int tapenum = spill->partitions[i];
3144  HashAggBatch *new_batch;
3145 
3146  /* if the partition is empty, don't create a new batch of work */
3147  if (spill->ntuples[i] == 0)
3148  continue;
3149 
3150  new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
3151  tapenum, setno, spill->ntuples[i],
3152  used_bits);
3153  aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
3154  aggstate->hash_batches_used++;
3155  }
3156 
3157  pfree(spill->ntuples);
3158  pfree(spill->partitions);
3159 }
3160 
3161 /*
3162  * Free resources related to a spilled HashAgg.
3163  */
3164 static void
3166 {
3167  ListCell *lc;
3168 
3169  /* free spills from initial pass */
3170  if (aggstate->hash_spills != NULL)
3171  {
3172  int setno;
3173 
3174  for (setno = 0; setno < aggstate->num_hashes; setno++)
3175  {
3176  HashAggSpill *spill = &aggstate->hash_spills[setno];
3177 
3178  pfree(spill->ntuples);
3179  pfree(spill->partitions);
3180  }
3181  pfree(aggstate->hash_spills);
3182  aggstate->hash_spills = NULL;
3183  }
3184 
3185  /* free batches */
3186  foreach(lc, aggstate->hash_batches)
3187  {
3188  HashAggBatch *batch = (HashAggBatch *) lfirst(lc);
3189 
3190  pfree(batch);
3191  }
3192  list_free(aggstate->hash_batches);
3193  aggstate->hash_batches = NIL;
3194 
3195  /* close tape set */
3196  if (aggstate->hash_tapeinfo != NULL)
3197  {
3198  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3199 
3200  LogicalTapeSetClose(tapeinfo->tapeset);
3201  pfree(tapeinfo->freetapes);
3202  pfree(tapeinfo);
3203  aggstate->hash_tapeinfo = NULL;
3204  }
3205 }
3206 
3207 
3208 /* -----------------
3209  * ExecInitAgg
3210  *
3211  * Creates the run-time information for the agg node produced by the
3212  * planner and initializes its outer subtree.
3213  *
3214  * -----------------
3215  */
3216 AggState *
3217 ExecInitAgg(Agg *node, EState *estate, int eflags)
3218 {
3219  AggState *aggstate;
3220  AggStatePerAgg peraggs;
3221  AggStatePerTrans pertransstates;
3222  AggStatePerGroup *pergroups;
3223  Plan *outerPlan;
3224  ExprContext *econtext;
3225  TupleDesc scanDesc;
3226  int numaggs,
3227  transno,
3228  aggno;
3229  int phase;
3230  int phaseidx;
3231  ListCell *l;
3232  Bitmapset *all_grouped_cols = NULL;
3233  int numGroupingSets = 1;
3234  int numPhases;
3235  int numHashes;
3236  int i = 0;
3237  int j = 0;
3238  bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3239  node->aggstrategy == AGG_MIXED);
3240 
3241  /* check for unsupported flags */
3242  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3243 
3244  /*
3245  * create state structure
3246  */
3247  aggstate = makeNode(AggState);
3248  aggstate->ss.ps.plan = (Plan *) node;
3249  aggstate->ss.ps.state = estate;
3250  aggstate->ss.ps.ExecProcNode = ExecAgg;
3251 
3252  aggstate->aggs = NIL;
3253  aggstate->numaggs = 0;
3254  aggstate->numtrans = 0;
3255  aggstate->aggstrategy = node->aggstrategy;
3256  aggstate->aggsplit = node->aggsplit;
3257  aggstate->maxsets = 0;
3258  aggstate->projected_set = -1;
3259  aggstate->current_set = 0;
3260  aggstate->peragg = NULL;
3261  aggstate->pertrans = NULL;
3262  aggstate->curperagg = NULL;
3263  aggstate->curpertrans = NULL;
3264  aggstate->input_done = false;
3265  aggstate->agg_done = false;
3266  aggstate->pergroups = NULL;
3267  aggstate->grp_firstTuple = NULL;
3268  aggstate->sort_in = NULL;
3269  aggstate->sort_out = NULL;
3270 
3271  /*
3272  * phases[0] always exists, but is dummy in sorted/plain mode
3273  */
3274  numPhases = (use_hashing ? 1 : 2);
3275  numHashes = (use_hashing ? 1 : 0);
3276 
3277  /*
3278  * Calculate the maximum number of grouping sets in any phase; this
3279  * determines the size of some allocations. Also calculate the number of
3280  * phases, since all hashed/mixed nodes contribute to only a single phase.
3281  */
3282  if (node->groupingSets)
3283  {
3284  numGroupingSets = list_length(node->groupingSets);
3285 
3286  foreach(l, node->chain)
3287  {
3288  Agg *agg = lfirst(l);
3289 
3290  numGroupingSets = Max(numGroupingSets,
3291  list_length(agg->groupingSets));
3292 
3293  /*
3294  * additional AGG_HASHED aggs become part of phase 0, but all
3295  * others add an extra phase.
3296  */
3297  if (agg->aggstrategy != AGG_HASHED)
3298  ++numPhases;
3299  else
3300  ++numHashes;
3301  }
3302  }
3303 
3304  aggstate->maxsets = numGroupingSets;
3305  aggstate->numphases = numPhases;
3306 
3307  aggstate->aggcontexts = (ExprContext **)
3308  palloc0(sizeof(ExprContext *) * numGroupingSets);
3309 
3310  /*
3311  * Create expression contexts. We need three or more, one for
3312  * per-input-tuple processing, one for per-output-tuple processing, one
3313  * for all the hashtables, and one for each grouping set. The per-tuple
3314  * memory context of the per-grouping-set ExprContexts (aggcontexts)
3315  * replaces the standalone memory context formerly used to hold transition
3316  * values. We cheat a little by using ExecAssignExprContext() to build
3317  * all of them.
3318  *
3319  * NOTE: the details of what is stored in aggcontexts and what is stored
3320  * in the regular per-query memory context are driven by a simple
3321  * decision: we want to reset the aggcontext at group boundaries (if not
3322  * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3323  */
3324  ExecAssignExprContext(estate, &aggstate->ss.ps);
3325  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3326 
3327  for (i = 0; i < numGroupingSets; ++i)
3328  {
3329  ExecAssignExprContext(estate, &aggstate->ss.ps);
3330  aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3331  }
3332 
3333  if (use_hashing)
3334  aggstate->hashcontext = CreateWorkExprContext(estate);
3335 
3336  ExecAssignExprContext(estate, &aggstate->ss.ps);
3337 
3338  /*
3339  * Initialize child nodes.
3340  *
3341  * If we are doing a hashed aggregation then the child plan does not need
3342  * to handle REWIND efficiently; see ExecReScanAgg.
3343  */
3344  if (node->aggstrategy == AGG_HASHED)
3345  eflags &= ~EXEC_FLAG_REWIND;
3346  outerPlan = outerPlan(node);
3347  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3348 
3349  /*
3350  * initialize source tuple type.
3351  */
3352  aggstate->ss.ps.outerops =
3354  &aggstate->ss.ps.outeropsfixed);
3355  aggstate->ss.ps.outeropsset = true;
3356 
3357  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3358  aggstate->ss.ps.outerops);
3359  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3360 
3361  /*
3362  * If there are more than two phases (including a potential dummy phase
3363  * 0), input will be resorted using tuplesort. Need a slot for that.
3364  */
3365  if (numPhases > 2)
3366  {
3367  aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3369 
3370  /*
3371  * The output of the tuplesort, and the output from the outer child
3372  * might not use the same type of slot. In most cases the child will
3373  * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3374  * input can also be presorted due an index, in which case it could be
3375  * a different type of slot.
3376  *
3377  * XXX: For efficiency it would be good to instead/additionally
3378  * generate expressions with corresponding settings of outerops* for
3379  * the individual phases - deforming is often a bottleneck for
3380  * aggregations with lots of rows per group. If there's multiple
3381  * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3382  * the nodeAgg.c internal tuplesort).
3383  */
3384  if (aggstate->ss.ps.outeropsfixed &&
3385  aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3386  aggstate->ss.ps.outeropsfixed = false;
3387  }
3388 
3389  /*
3390  * Initialize result type, slot and projection.
3391  */
3393  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3394 
3395  /*
3396  * initialize child expressions
3397  *
3398  * We expect the parser to have checked that no aggs contain other agg
3399  * calls in their arguments (and just to be sure, we verify it again while
3400  * initializing the plan node). This would make no sense under SQL
3401  * semantics, and it's forbidden by the spec. Because it is true, we
3402  * don't need to worry about evaluating the aggs in any particular order.
3403  *
3404  * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
3405  * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
3406  * in the targetlist are found during ExecAssignProjectionInfo, below.
3407  */
3408  aggstate->ss.ps.qual =
3409  ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3410 
3411  /*
3412  * We should now have found all Aggrefs in the targetlist and quals.
3413  */
3414  numaggs = aggstate->numaggs;
3415  Assert(numaggs == list_length(aggstate->aggs));
3416 
3417  /*
3418  * For each phase, prepare grouping set data and fmgr lookup data for
3419  * compare functions. Accumulate all_grouped_cols in passing.
3420  */
3421  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3422 
3423  aggstate->num_hashes = numHashes;
3424  if (numHashes)
3425  {
3426  aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3427  aggstate->phases[0].numsets = 0;
3428  aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3429  aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3430  }
3431 
3432  phase = 0;
3433  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3434  {
3435  Agg *aggnode;
3436  Sort *sortnode;
3437 
3438  if (phaseidx > 0)
3439  {
3440  aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3441  sortnode = castNode(Sort, aggnode->plan.lefttree);
3442  }
3443  else
3444  {
3445  aggnode = node;
3446  sortnode = NULL;
3447  }
3448 
3449  Assert(phase <= 1 || sortnode);
3450 
3451  if (aggnode->aggstrategy == AGG_HASHED
3452  || aggnode->aggstrategy == AGG_MIXED)
3453  {
3454  AggStatePerPhase phasedata = &aggstate->phases[0];
3455  AggStatePerHash perhash;
3456  Bitmapset *cols = NULL;
3457 
3458  Assert(phase == 0);
3459  i = phasedata->numsets++;
3460  perhash = &aggstate->perhash[i];
3461 
3462  /* phase 0 always points to the "real" Agg in the hash case */
3463  phasedata->aggnode = node;
3464  phasedata->aggstrategy = node->aggstrategy;
3465 
3466  /* but the actual Agg node representing this hash is saved here */
3467  perhash->aggnode = aggnode;
3468 
3469  phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3470 
3471  for (j = 0; j < aggnode->numCols; ++j)
3472  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3473 
3474  phasedata->grouped_cols[i] = cols;
3475 
3476  all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3477  continue;
3478  }
3479  else
3480  {
3481  AggStatePerPhase phasedata = &aggstate->phases[++phase];
3482  int num_sets;
3483 
3484  phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3485 
3486  if (num_sets)
3487  {
3488  phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3489  phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3490 
3491  i = 0;
3492  foreach(l, aggnode->groupingSets)
3493  {
3494  int current_length = list_length(lfirst(l));
3495  Bitmapset *cols = NULL;
3496 
3497  /* planner forces this to be correct */
3498  for (j = 0; j < current_length; ++j)
3499  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3500 
3501  phasedata->grouped_cols[i] = cols;
3502  phasedata->gset_lengths[i] = current_length;
3503 
3504  ++i;
3505  }
3506 
3507  all_grouped_cols = bms_add_members(all_grouped_cols,
3508  phasedata->grouped_cols[0]);
3509  }
3510  else
3511  {
3512  Assert(phaseidx == 0);
3513 
3514  phasedata->gset_lengths = NULL;
3515  phasedata->grouped_cols = NULL;
3516  }
3517 
3518  /*
3519  * If we are grouping, precompute fmgr lookup data for inner loop.
3520  */
3521  if (aggnode->aggstrategy == AGG_SORTED)
3522  {
3523  int i = 0;
3524 
3525  Assert(aggnode->numCols > 0);
3526 
3527  /*
3528  * Build a separate function for each subset of columns that
3529  * need to be compared.
3530  */
3531  phasedata->eqfunctions =
3532  (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3533 
3534  /* for each grouping set */
3535  for (i = 0; i < phasedata->numsets; i++)
3536  {
3537  int length = phasedata->gset_lengths[i];
3538 
3539  if (phasedata->eqfunctions[length - 1] != NULL)
3540  continue;
3541 
3542  phasedata->eqfunctions[length - 1] =
3543  execTuplesMatchPrepare(scanDesc,
3544  length,
3545  aggnode->grpColIdx,
3546  aggnode->grpOperators,
3547  aggnode->grpCollations,
3548  (PlanState *) aggstate);
3549  }
3550 
3551  /* and for all grouped columns, unless already computed */
3552  if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3553  {
3554  phasedata->eqfunctions[aggnode->numCols - 1] =
3555  execTuplesMatchPrepare(scanDesc,
3556  aggnode->numCols,
3557  aggnode->grpColIdx,
3558  aggnode->grpOperators,
3559  aggnode->grpCollations,
3560  (PlanState *) aggstate);
3561  }
3562  }
3563 
3564  phasedata->aggnode = aggnode;
3565  phasedata->aggstrategy = aggnode->aggstrategy;
3566  phasedata->sortnode = sortnode;
3567  }
3568  }
3569 
3570  /*
3571  * Convert all_grouped_cols to a descending-order list.
3572  */
3573  i = -1;
3574  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3575  aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3576 
3577  /*
3578  * Set up aggregate-result storage in the output expr context, and also
3579  * allocate my private per-agg working storage
3580  */
3581  econtext = aggstate->ss.ps.ps_ExprContext;
3582  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3583  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3584 
3585  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3586  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
3587 
3588  aggstate->peragg = peraggs;
3589  aggstate->pertrans = pertransstates;
3590 
3591 
3592  aggstate->all_pergroups =
3594  * (numGroupingSets + numHashes));
3595  pergroups = aggstate->all_pergroups;
3596 
3597  if (node->aggstrategy != AGG_HASHED)
3598  {
3599  for (i = 0; i < numGroupingSets; i++)
3600  {
3601  pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3602  * numaggs);
3603  }
3604 
3605  aggstate->pergroups = pergroups;
3606  pergroups += numGroupingSets;
3607  }
3608 
3609  /*
3610  * Hashing can only appear in the initial phase.
3611  */
3612  if (use_hashing)
3613  {
3614  Plan *outerplan = outerPlan(node);
3615  uint64 totalGroups = 0;
3616  int i;
3617 
3618  aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3619  "HashAgg meta context",
3621  aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3623  aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3624  &TTSOpsVirtual);
3625 
3626  /* this is an array of pointers, not structures */
3627  aggstate->hash_pergroup = pergroups;
3628 
3629  aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3630  outerplan->plan_width,
3631  node->transitionSpace);
3632 
3633  /*
3634  * Consider all of the grouping sets together when setting the limits
3635  * and estimating the number of partitions. This can be inaccurate
3636  * when there is more than one grouping set, but should still be
3637  * reasonable.
3638  */
3639  for (i = 0; i < aggstate->num_hashes; i++)
3640  totalGroups += aggstate->perhash[i].aggnode->numGroups;
3641 
3642  hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3643  &aggstate->hash_mem_limit,
3644  &aggstate->hash_ngroups_limit,
3645  &aggstate->hash_planned_partitions);
3646  find_hash_columns(aggstate);
3647  build_hash_tables(aggstate);
3648  aggstate->table_filled = false;
3649  }
3650 
3651  /*
3652  * Initialize current phase-dependent values to initial phase. The initial
3653  * phase is 1 (first sort pass) for all strategies that use sorting (if
3654  * hashing is being done too, then phase 0 is processed last); but if only
3655  * hashing is being done, then phase 0 is all there is.
3656  */
3657  if (node->aggstrategy == AGG_HASHED)
3658  {
3659  aggstate->current_phase = 0;
3660  initialize_phase(aggstate, 0);
3661  select_current_set(aggstate, 0, true);
3662  }
3663  else
3664  {
3665  aggstate->current_phase = 1;
3666  initialize_phase(aggstate, 1);
3667  select_current_set(aggstate, 0, false);
3668  }
3669 
3670  /* -----------------
3671  * Perform lookups of aggregate function info, and initialize the
3672  * unchanging fields of the per-agg and per-trans data.
3673  *
3674  * We try to optimize by detecting duplicate aggregate functions so that
3675  * their state and final values are re-used, rather than needlessly being
3676  * re-calculated independently. We also detect aggregates that are not
3677  * the same, but which can share the same transition state.
3678  *
3679  * Scenarios:
3680  *
3681  * 1. Identical aggregate function calls appear in the query:
3682  *
3683  * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
3684  *
3685  * Since these aggregates are identical, we only need to calculate
3686  * the value once. Both aggregates will share the same 'aggno' value.
3687  *
3688  * 2. Two different aggregate functions appear in the query, but the
3689  * aggregates have the same arguments, transition functions and
3690  * initial values (and, presumably, different final functions):
3691  *
3692  * SELECT AVG(x), STDDEV(x) FROM ...
3693  *
3694  * In this case we must create a new peragg for the varying aggregate,
3695  * and we need to call the final functions separately, but we need
3696  * only run the transition function once. (This requires that the
3697  * final functions be nondestructive of the transition state, but
3698  * that's required anyway for other reasons.)
3699  *
3700  * For either of these optimizations to be valid, all aggregate properties
3701  * used in the transition phase must be the same, including any modifiers
3702  * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
3703  * contain any volatile functions.
3704  * -----------------
3705  */
3706  aggno = -1;
3707  transno = -1;
3708  foreach(l, aggstate->aggs)
3709  {
3710  AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
3711  Aggref *aggref = aggrefstate->aggref;
3712  AggStatePerAgg peragg;
3713  AggStatePerTrans pertrans;
3714  int existing_aggno;
3715  int existing_transno;
3716  List *same_input_transnos;
3717  Oid inputTypes[FUNC_MAX_ARGS];
3718  int numArguments;
3719  int numDirectArgs;
3720  HeapTuple aggTuple;
3721  Form_pg_aggregate aggform;
3722  AclResult aclresult;
3723  Oid transfn_oid,
3724  finalfn_oid;
3725  bool shareable;
3726  Oid serialfn_oid,
3727  deserialfn_oid;
3728  Expr *finalfnexpr;
3729  Oid aggtranstype;
3730  Datum textInitVal;
3731  Datum initValue;
3732  bool initValueIsNull;
3733 
3734  /* Planner should have assigned aggregate to correct level */
3735  Assert(aggref->agglevelsup == 0);
3736  /* ... and the split mode should match */
3737  Assert(aggref->aggsplit == aggstate->aggsplit);
3738 
3739  /* 1. Check for already processed aggs which can be re-used */
3740  existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
3741  &same_input_transnos);
3742  if (existing_aggno != -1)
3743  {
3744  /*
3745  * Existing compatible agg found. so just point the Aggref to the
3746  * same per-agg struct.
3747  */
3748  aggrefstate->aggno = existing_aggno;
3749  continue;
3750  }
3751 
3752  /* Mark Aggref state node with assigned index in the result array */
3753  peragg = &peraggs[++aggno];
3754  peragg->aggref = aggref;
3755  aggrefstate->aggno = aggno;
3756 
3757  /* Fetch the pg_aggregate row */
3758  aggTuple = SearchSysCache1(AGGFNOID,
3759  ObjectIdGetDatum(aggref->aggfnoid));
3760  if (!HeapTupleIsValid(aggTuple))
3761  elog(ERROR, "cache lookup failed for aggregate %u",
3762  aggref->aggfnoid);
3763  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3764 
3765  /* Check permission to call aggregate function */
3766  aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3767  ACL_EXECUTE);
3768  if (aclresult != ACLCHECK_OK)
3769  aclcheck_error(aclresult, OBJECT_AGGREGATE,
3770  get_func_name(aggref->aggfnoid));
3772 
3773  /* planner recorded transition state type in the Aggref itself */
3774  aggtranstype = aggref->aggtranstype;
3775  Assert(OidIsValid(aggtranstype));
3776 
3777  /*
3778  * If this aggregation is performing state combines, then instead of
3779  * using the transition function, we'll use the combine function
3780  */
3781  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3782  {
3783  transfn_oid = aggform->aggcombinefn;
3784 
3785  /* If not set then the planner messed up */
3786  if (!OidIsValid(transfn_oid))
3787  elog(ERROR, "combinefn not set for aggregate function");
3788  }
3789  else
3790  transfn_oid = aggform->aggtransfn;
3791 
3792  /* Final function only required if we're finalizing the aggregates */
3793  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3794  peragg->finalfn_oid = finalfn_oid = InvalidOid;
3795  else
3796  peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3797 
3798  /*
3799  * If finalfn is marked read-write, we can't share transition states;
3800  * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
3801  * if we're not executing the finalfn here, we can share regardless.
3802  */
3803  shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
3804  (finalfn_oid == InvalidOid);
3805  peragg->shareable = shareable;
3806 
3807  serialfn_oid = InvalidOid;
3808  deserialfn_oid = InvalidOid;
3809 
3810  /*
3811  * Check if serialization/deserialization is required. We only do it
3812  * for aggregates that have transtype INTERNAL.
3813  */
3814  if (aggtranstype == INTERNALOID)
3815  {
3816  /*
3817  * The planner should only have generated a serialize agg node if
3818  * every aggregate with an INTERNAL state has a serialization
3819  * function. Verify that.
3820  */
3821  if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3822  {
3823  /* serialization only valid when not running finalfn */
3824  Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3825 
3826  if (!OidIsValid(aggform->aggserialfn))
3827  elog(ERROR, "serialfunc not provided for serialization aggregation");
3828  serialfn_oid = aggform->aggserialfn;
3829  }
3830 
3831  /* Likewise for deserialization functions */
3832  if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3833  {
3834  /* deserialization only valid when combining states */
3835  Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3836 
3837  if (!OidIsValid(aggform->aggdeserialfn))
3838  elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3839  deserialfn_oid = aggform->aggdeserialfn;
3840  }
3841  }
3842 
3843  /* Check that aggregate owner has permission to call component fns */
3844  {
3845  HeapTuple procTuple;
3846  Oid aggOwner;
3847 
3848  procTuple = SearchSysCache1(PROCOID,
3849  ObjectIdGetDatum(aggref->aggfnoid));
3850  if (!HeapTupleIsValid(procTuple))
3851  elog(ERROR, "cache lookup failed for function %u",
3852  aggref->aggfnoid);
3853  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3854  ReleaseSysCache(procTuple);
3855 
3856  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3857  ACL_EXECUTE);
3858  if (aclresult != ACLCHECK_OK)
3859  aclcheck_error(aclresult, OBJECT_FUNCTION,
3860  get_func_name(transfn_oid));
3861  InvokeFunctionExecuteHook(transfn_oid);
3862  if (OidIsValid(finalfn_oid))
3863  {
3864  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3865  ACL_EXECUTE);
3866  if (aclresult != ACLCHECK_OK)
3867  aclcheck_error(aclresult, OBJECT_FUNCTION,
3868  get_func_name(finalfn_oid));
3869  InvokeFunctionExecuteHook(finalfn_oid);
3870  }
3871  if (OidIsValid(serialfn_oid))
3872  {
3873  aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3874  ACL_EXECUTE);
3875  if (aclresult != ACLCHECK_OK)
3876  aclcheck_error(aclresult, OBJECT_FUNCTION,
3877  get_func_name(serialfn_oid));
3878  InvokeFunctionExecuteHook(serialfn_oid);
3879  }
3880  if (OidIsValid(deserialfn_oid))
3881  {
3882  aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3883  ACL_EXECUTE);
3884  if (aclresult != ACLCHECK_OK)
3885  aclcheck_error(aclresult, OBJECT_FUNCTION,
3886  get_func_name(deserialfn_oid));
3887  InvokeFunctionExecuteHook(deserialfn_oid);
3888  }
3889  }
3890 
3891  /*
3892  * Get actual datatypes of the (nominal) aggregate inputs. These
3893  * could be different from the agg's declared input types, when the
3894  * agg accepts ANY or a polymorphic type.
3895  */
3896  numArguments = get_aggregate_argtypes(aggref, inputTypes);
3897 
3898  /* Count the "direct" arguments, if any */
3899  numDirectArgs = list_length(aggref->aggdirectargs);
3900 
3901  /* Detect how many arguments to pass to the finalfn */
3902  if (aggform->aggfinalextra)
3903  peragg->numFinalArgs = numArguments + 1;
3904  else
3905  peragg->numFinalArgs = numDirectArgs + 1;
3906 
3907  /* Initialize any direct-argument expressions */
3908  peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3909  (PlanState *) aggstate);
3910 
3911  /*
3912  * build expression trees using actual argument & result types for the
3913  * finalfn, if it exists and is required.
3914  */
3915  if (OidIsValid(finalfn_oid))
3916  {
3917  build_aggregate_finalfn_expr(inputTypes,
3918  peragg->numFinalArgs,
3919  aggtranstype,
3920  aggref->aggtype,
3921  aggref->inputcollid,
3922  finalfn_oid,
3923  &finalfnexpr);
3924  fmgr_info(finalfn_oid, &peragg->finalfn);
3925  fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3926  }
3927 
3928  /* get info about the output value's datatype */
3929  get_typlenbyval(aggref->aggtype,
3930  &peragg->resulttypeLen,
3931  &peragg->resulttypeByVal);
3932 
3933  /*
3934  * initval is potentially null, so don't try to access it as a struct
3935  * field. Must do it the hard way with SysCacheGetAttr.
3936  */
3937  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3938  Anum_pg_aggregate_agginitval,
3939  &initValueIsNull);
3940  if (initValueIsNull)
3941  initValue = (Datum) 0;
3942  else
3943  initValue = GetAggInitVal(textInitVal, aggtranstype);
3944 
3945  /*
3946  * 2. Build working state for invoking the transition function, or
3947  * look up previously initialized working state, if we can share it.
3948  *
3949  * find_compatible_peragg() already collected a list of shareable
3950  * per-Trans's with the same inputs. Check if any of them have the
3951  * same transition function and initial value.
3952  */
3953  existing_transno = find_compatible_pertrans(aggstate, aggref,
3954  shareable,
3955  transfn_oid, aggtranstype,
3956  serialfn_oid, deserialfn_oid,
3957  initValue, initValueIsNull,
3958  same_input_transnos);
3959  if (existing_transno != -1)
3960  {
3961  /*
3962  * Existing compatible trans found, so just point the 'peragg' to
3963  * the same per-trans struct, and mark the trans state as shared.
3964  */
3965  pertrans = &pertransstates[existing_transno];
3966  pertrans->aggshared = true;
3967  peragg->transno = existing_transno;
3968  }
3969  else
3970  {
3971  pertrans = &pertransstates[++transno];
3972  build_pertrans_for_aggref(pertrans, aggstate, estate,
3973  aggref, transfn_oid, aggtranstype,
3974  serialfn_oid, deserialfn_oid,
3975  initValue, initValueIsNull,
3976  inputTypes, numArguments);
3977  peragg->transno = transno;
3978  }
3979  ReleaseSysCache(aggTuple);
3980  }
3981 
3982  /*
3983  * Update aggstate->numaggs to be the number of unique aggregates found.
3984  * Also set numstates to the number of unique transition states found.
3985  */
3986  aggstate->numaggs = aggno + 1;
3987  aggstate->numtrans = transno + 1;
3988 
3989  /*
3990  * Last, check whether any more aggregates got added onto the node while
3991  * we processed the expressions for the aggregate arguments (including not
3992  * only the regular arguments and FILTER expressions handled immediately
3993  * above, but any direct arguments we might've handled earlier). If so,
3994  * we have nested aggregate functions, which is semantically nonsensical,
3995  * so complain. (This should have been caught by the parser, so we don't
3996  * need to work hard on a helpful error message; but we defend against it
3997  * here anyway, just to be sure.)
3998  */
3999  if (numaggs != list_length(aggstate->aggs))
4000  ereport(ERROR,
4001  (errcode(ERRCODE_GROUPING_ERROR),
4002  errmsg("aggregate function calls cannot be nested")));
4003 
4004  /*
4005  * Build expressions doing all the transition work at once. We build a
4006  * different one for each phase, as the number of transition function
4007  * invocation can differ between phases. Note this'll work both for
4008  * transition and combination functions (although there'll only be one
4009  * phase in the latter case).
4010  */
4011  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
4012  {
4013  AggStatePerPhase phase = &aggstate->phases[phaseidx];
4014  bool dohash = false;
4015  bool dosort = false;
4016 
4017  /* phase 0 doesn't necessarily exist */
4018  if (!phase->aggnode)
4019  continue;
4020 
4021  if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
4022  {
4023  /*
4024  * Phase one, and only phase one, in a mixed agg performs both
4025  * sorting and aggregation.
4026  */
4027  dohash = true;
4028  dosort = true;
4029  }
4030  else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4031  {
4032  /*
4033  * No need to compute a transition function for an AGG_MIXED phase
4034  * 0 - the contents of the hashtables will have been computed
4035  * during phase 1.
4036  */
4037  continue;
4038  }
4039  else if (phase->aggstrategy == AGG_PLAIN ||
4040  phase->aggstrategy == AGG_SORTED)
4041  {
4042  dohash = false;
4043  dosort = true;
4044  }
4045  else if (phase->aggstrategy == AGG_HASHED)
4046  {
4047  dohash = true;
4048  dosort = false;
4049  }
4050  else
4051  Assert(false);
4052 
4053  phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4054  false);
4055 
4056  /* cache compiled expression for outer slot without NULL check */
4057  phase->evaltrans_cache[0][0] = phase->evaltrans;
4058  }
4059 
4060  return aggstate;
4061 }
4062 
4063 /*
4064  * Build the state needed to calculate a state value for an aggregate.
4065  *
4066  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4067  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
4068  * of the arguments could be calculated from 'aggref', but the caller has
4069  * calculated them already, so might as well pass them.
4070  */
4071 static void
4073  AggState *aggstate, EState *estate,
4074  Aggref *aggref,
4075  Oid aggtransfn, Oid aggtranstype,
4076  Oid aggserialfn, Oid aggdeserialfn,
4077  Datum initValue, bool initValueIsNull,
4078  Oid *inputTypes, int numArguments)
4079 {
4080  int numGroupingSets = Max(aggstate->maxsets, 1);
4081  Expr *serialfnexpr = NULL;
4082  Expr *deserialfnexpr = NULL;
4083  ListCell *lc;
4084  int numInputs;
4085  int numDirectArgs;
4086  List *sortlist;
4087  int numSortCols;
4088  int numDistinctCols;
4089  int i;
4090 
4091  /* Begin filling in the pertrans data */
4092  pertrans->aggref = aggref;
4093  pertrans->aggshared = false;
4094  pertrans->aggCollation = aggref->inputcollid;
4095  pertrans->transfn_oid = aggtransfn;
4096  pertrans->serialfn_oid = aggserialfn;
4097  pertrans->deserialfn_oid = aggdeserialfn;
4098  pertrans->initValue = initValue;
4099  pertrans->initValueIsNull = initValueIsNull;
4100 
4101  /* Count the "direct" arguments, if any */
4102  numDirectArgs = list_length(aggref->aggdirectargs);
4103 
4104  /* Count the number of aggregated input columns */
4105  pertrans->numInputs = numInputs = list_length(aggref->args);
4106 
4107  pertrans->aggtranstype = aggtranstype;
4108 
4109  /*
4110  * When combining states, we have no use at all for the aggregate
4111  * function's transfn. Instead we use the combinefn. In this case, the
4112  * transfn and transfn_oid fields of pertrans refer to the combine
4113  * function rather than the transition function.
4114  */
4115  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4116  {
4117  Expr *combinefnexpr;
4118  size_t numTransArgs;
4119 
4120  /*
4121  * When combining there's only one input, the to-be-combined added
4122  * transition value from below (this node's transition value is
4123  * counted separately).
4124  */
4125  pertrans->numTransInputs = 1;
4126 
4127  /* account for the current transition state */
4128  numTransArgs = pertrans->numTransInputs + 1;
4129 
4130  build_aggregate_combinefn_expr(aggtranstype,
4131  aggref->inputcollid,
4132  aggtransfn,
4133  &combinefnexpr);
4134  fmgr_info(aggtransfn, &pertrans->transfn);
4135  fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4136 
4137  pertrans->transfn_fcinfo =
4140  &pertrans->transfn,
4141  numTransArgs,
4142  pertrans->aggCollation,
4143  (void *) aggstate, NULL);
4144 
4145  /*
4146  * Ensure that a combine function to combine INTERNAL states is not
4147  * strict. This should have been checked during CREATE AGGREGATE, but
4148  * the strict property could have been changed since then.
4149  */
4150  if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4151  ereport(ERROR,
4152  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4153  errmsg("combine function with transition type %s must not be declared STRICT",
4154  format_type_be(aggtranstype))));
4155  }
4156  else
4157  {
4158  Expr *transfnexpr;
4159  size_t numTransArgs;
4160 
4161  /* Detect how many arguments to pass to the transfn */
4162  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4163  pertrans->numTransInputs = numInputs;
4164  else
4165  pertrans->numTransInputs = numArguments;
4166 
4167  /* account for the current transition state */
4168  numTransArgs = pertrans->numTransInputs + 1;
4169 
4170  /*
4171  * Set up infrastructure for calling the transfn. Note that
4172  * invtransfn is not needed here.
4173  */
4174  build_aggregate_transfn_expr(inputTypes,
4175  numArguments,
4176  numDirectArgs,
4177  aggref->aggvariadic,
4178  aggtranstype,
4179  aggref->inputcollid,
4180  aggtransfn,
4181  InvalidOid,
4182  &transfnexpr,
4183  NULL);
4184  fmgr_info(aggtransfn, &pertrans->transfn);
4185  fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4186 
4187  pertrans->transfn_fcinfo =
4190  &pertrans->transfn,
4191  numTransArgs,
4192  pertrans->aggCollation,
4193  (void *) aggstate, NULL);
4194 
4195  /*
4196  * If the transfn is strict and the initval is NULL, make sure input
4197  * type and transtype are the same (or at least binary-compatible), so
4198  * that it's OK to use the first aggregated input value as the initial
4199  * transValue. This should have been checked at agg definition time,
4200  * but we must check again in case the transfn's strictness property
4201  * has been changed.
4202  */
4203  if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4204  {
4205  if (numArguments <= numDirectArgs ||
4206  !IsBinaryCoercible(inputTypes[numDirectArgs],
4207  aggtranstype))
4208  ereport(ERROR,
4209  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4210  errmsg("aggregate %u needs to have compatible input type and transition type",
4211  aggref->aggfnoid)));
4212  }
4213  }
4214 
4215  /* get info about the state value's datatype */
4216  get_typlenbyval(aggtranstype,
4217  &pertrans->transtypeLen,
4218  &pertrans->transtypeByVal);
4219 
4220  if (OidIsValid(aggserialfn))
4221  {
4222  build_aggregate_serialfn_expr(aggserialfn,
4223  &serialfnexpr);
4224  fmgr_info(aggserialfn, &pertrans->serialfn);
4225  fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4226 
4227  pertrans->serialfn_fcinfo =
4230  &pertrans->serialfn,
4231  1,
4232  InvalidOid,
4233  (void *) aggstate, NULL);
4234  }
4235 
4236  if (OidIsValid(aggdeserialfn))
4237  {
4238  build_aggregate_deserialfn_expr(aggdeserialfn,
4239  &deserialfnexpr);
4240  fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4241  fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4242 
4243  pertrans->deserialfn_fcinfo =
4246  &pertrans->deserialfn,
4247  2,
4248  InvalidOid,
4249  (void *) aggstate, NULL);
4250 
4251  }
4252 
4253  /*
4254  * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4255  * have a list of SortGroupClause nodes; fish out the data in them and
4256  * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4257  * however; the agg's transfn and finalfn are responsible for that.
4258  *
4259  * Note that by construction, if there is a DISTINCT clause then the ORDER
4260  * BY clause is a prefix of it (see transformDistinctClause).
4261  */
4262  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4263  {
4264  sortlist = NIL;
4265  numSortCols = numDistinctCols = 0;
4266  }
4267  else if (aggref->aggdistinct)
4268  {
4269  sortlist = aggref->aggdistinct;
4270  numSortCols = numDistinctCols = list_length(sortlist);
4271  Assert(numSortCols >= list_length(aggref->aggorder));
4272  }
4273  else
4274  {
4275  sortlist = aggref->aggorder;
4276  numSortCols = list_length(sortlist);
4277  numDistinctCols = 0;
4278  }
4279 
4280  pertrans->numSortCols = numSortCols;
4281  pertrans->numDistinctCols = numDistinctCols;
4282 
4283  /*
4284  * If we have either sorting or filtering to do, create a tupledesc and
4285  * slot corresponding to the aggregated inputs (including sort
4286  * expressions) of the agg.
4287  */
4288  if (numSortCols > 0 || aggref->aggfilter)
4289  {
4290  pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4291  pertrans->sortslot =
4292  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4294  }
4295 
4296  if (numSortCols > 0)
4297  {
4298  /*
4299  * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4300  * (yet)
4301  */
4302  Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4303 
4304  /* If we have only one input, we need its len/byval info. */
4305  if (numInputs == 1)
4306  {
4307  get_typlenbyval(inputTypes[numDirectArgs],
4308  &pertrans->inputtypeLen,
4309  &pertrans->inputtypeByVal);
4310  }
4311  else if (numDistinctCols > 0)
4312  {
4313  /* we will need an extra slot to store prior values */
4314  pertrans->uniqslot =
4315  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4317  }
4318 
4319  /* Extract the sort information for use later */
4320  pertrans->sortColIdx =
4321  (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4322  pertrans->sortOperators =
4323  (Oid *) palloc(numSortCols * sizeof(Oid));
4324  pertrans->sortCollations =
4325  (Oid *) palloc(numSortCols * sizeof(Oid));
4326  pertrans->sortNullsFirst =
4327  (bool *) palloc(numSortCols * sizeof(bool));
4328 
4329  i = 0;
4330  foreach(lc, sortlist)
4331  {
4332  SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4333  TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4334 
4335  /* the parser should have made sure of this */
4336  Assert(OidIsValid(sortcl->sortop));
4337 
4338  pertrans->sortColIdx[i] = tle->resno;
4339  pertrans->sortOperators[i] = sortcl->sortop;
4340  pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4341  pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4342  i++;
4343  }
4344  Assert(i == numSortCols);
4345  }
4346 
4347  if (aggref->aggdistinct)
4348  {
4349  Oid *ops;
4350 
4351  Assert(numArguments > 0);
4352  Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4353 
4354  ops = palloc(numDistinctCols * sizeof(Oid));
4355 
4356  i = 0;
4357  foreach(lc, aggref->aggdistinct)
4358  ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4359 
4360  /* lookup / build the necessary comparators */
4361  if (numDistinctCols == 1)
4362  fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4363  else
4364  pertrans->equalfnMulti =
4365  execTuplesMatchPrepare(pertrans->sortdesc,
4366  numDistinctCols,
4367  pertrans->sortColIdx,
4368  ops,
4369  pertrans->sortCollations,
4370  &aggstate->ss.ps);
4371  pfree(ops);
4372  }
4373 
4374  pertrans->sortstates = (Tuplesortstate **)
4375  palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4376 }
4377 
4378 
4379 static Datum
4380 GetAggInitVal(Datum textInitVal, Oid transtype)
4381 {
4382  Oid typinput,
4383  typioparam;
4384  char *strInitVal;
4385  Datum initVal;
4386 
4387  getTypeInputInfo(transtype, &typinput, &typioparam);
4388  strInitVal = TextDatumGetCString(textInitVal);
4389  initVal = OidInputFunctionCall(typinput, strInitVal,
4390  typioparam, -1);
4391  pfree(strInitVal);
4392  return initVal;
4393 }
4394 
4395 /*
4396  * find_compatible_peragg - search for a previously initialized per-Agg struct
4397  *
4398  * Searches the previously looked at aggregates to find one which is compatible
4399  * with this one, with the same input parameters. If no compatible aggregate
4400  * can be found, returns -1.
4401  *
4402  * As a side-effect, this also collects a list of existing, shareable per-Trans
4403  * structs with matching inputs. If no identical Aggref is found, the list is
4404  * passed later to find_compatible_pertrans, to see if we can at least reuse
4405  * the state value of another aggregate.
4406  */
4407 static int
4409  int lastaggno, List **same_input_transnos)
4410 {
4411  int aggno;
4412  AggStatePerAgg peraggs;
4413 
4414  *same_input_transnos = NIL;
4415 
4416  /* we mustn't reuse the aggref if it contains volatile function calls */
4417  if (contain_volatile_functions((Node *) newagg))
4418  return -1;
4419 
4420  peraggs = aggstate->peragg;
4421 
4422  /*
4423  * Search through the list of already seen aggregates. If we find an
4424  * existing identical aggregate call, then we can re-use that one. While
4425  * searching, we'll also collect a list of Aggrefs with the same input
4426  * parameters. If no matching Aggref is found, the caller can potentially
4427  * still re-use the transition state of one of them. (At this stage we
4428  * just compare the parsetrees; whether different aggregates share the
4429  * same transition function will be checked later.)
4430  */
4431  for (aggno = 0; aggno <= lastaggno; aggno++)
4432  {
4433  AggStatePerAgg peragg;
4434  Aggref *existingRef;
4435 
4436  peragg = &peraggs[aggno];
4437  existingRef = peragg->aggref;
4438 
4439  /* all of the following must be the same or it's no match */
4440  if (newagg->inputcollid != existingRef->inputcollid ||
4441  newagg->aggtranstype != existingRef->aggtranstype ||
4442  newagg->aggstar != existingRef->aggstar ||
4443  newagg->aggvariadic != existingRef->aggvariadic ||
4444  newagg->aggkind != existingRef->aggkind ||
4445  !equal(newagg->args, existingRef->args) ||
4446  !equal(newagg->aggorder, existingRef->aggorder) ||
4447  !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
4448  !equal(newagg->aggfilter, existingRef->aggfilter))
4449  continue;
4450 
4451  /* if it's the same aggregate function then report exact match */
4452  if (newagg->aggfnoid == existingRef->aggfnoid &&
4453  newagg->aggtype == existingRef->aggtype &&
4454  newagg->aggcollid == existingRef->aggcollid &&
4455  equal(newagg->aggdirectargs, existingRef->aggdirectargs))
4456  {
4457  list_free(*same_input_transnos);
4458  *same_input_transnos = NIL;
4459  return aggno;
4460  }
4461 
4462  /*
4463  * Not identical, but it had the same inputs. If the final function
4464  * permits sharing, return its transno to the caller, in case we can
4465  * re-use its per-trans state. (If there's already sharing going on,
4466  * we might report a transno more than once. find_compatible_pertrans
4467  * is cheap enough that it's not worth spending cycles to avoid that.)
4468  */
4469  if (peragg->shareable)
4470  *same_input_transnos = lappend_int(*same_input_transnos,
4471  peragg->transno);
4472  }
4473 
4474  return -1;
4475 }
4476 
4477 /*
4478  * find_compatible_pertrans - search for a previously initialized per-Trans
4479  * struct
4480  *
4481  * Searches the list of transnos for a per-Trans struct with the same
4482  * transition function and initial condition. (The inputs have already been
4483  * verified to match.)
4484  */
4485 static int
4486 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
4487  Oid aggtransfn, Oid aggtranstype,
4488  Oid aggserialfn, Oid aggdeserialfn,
4489  Datum initValue, bool initValueIsNull,
4490  List *transnos)
4491 {
4492  ListCell *lc;
4493 
4494  /* If this aggregate can't share transition states, give up */
4495  if (!shareable)
4496  return -1;
4497 
4498  foreach(lc, transnos)
4499  {
4500  int transno = lfirst_int(lc);
4501  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
4502 
4503  /*
4504  * if the transfns or transition state types are not the same then the
4505  * state can't be shared.
4506  */
4507  if (aggtransfn != pertrans->transfn_oid ||
4508  aggtranstype != pertrans->aggtranstype)
4509  continue;
4510 
4511  /*
4512  * The serialization and deserialization functions must match, if
4513  * present, as we're unable to share the trans state for aggregates
4514  * which will serialize or deserialize into different formats.
4515  * Remember that these will be InvalidOid if they're not required for
4516  * this agg node.
4517  */
4518  if (aggserialfn != pertrans->serialfn_oid ||
4519  aggdeserialfn != pertrans->deserialfn_oid)
4520  continue;
4521 
4522  /*
4523  * Check that the initial condition matches, too.
4524  */
4525  if (initValueIsNull && pertrans->initValueIsNull)
4526  return transno;
4527 
4528  if (!initValueIsNull && !pertrans->initValueIsNull &&
4529  datumIsEqual(initValue, pertrans->initValue,
4530  pertrans->transtypeByVal, pertrans->transtypeLen))
4531  return transno;
4532  }
4533  return -1;
4534 }
4535 
4536 void
4538 {
4540  int transno;
4541  int numGroupingSets = Max(node->maxsets, 1);
4542  int setno;
4543 
4544  /*
4545  * When ending a parallel worker, copy the statistics gathered by the
4546  * worker back into shared memory so that it can be picked up by the main
4547  * process to report in EXPLAIN ANALYZE.
4548  */
4549  if (node->shared_info && IsParallelWorker())
4550  {
4552 
4553  Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4556  si->hash_disk_used = node->hash_disk_used;
4557  si->hash_mem_peak = node->hash_mem_peak;
4558  }
4559 
4560  /* Make sure we have closed any open tuplesorts */
4561 
4562  if (node->sort_in)
4563  tuplesort_end(node->sort_in);
4564  if (node->sort_out)
4565  tuplesort_end(node->sort_out);
4566 
4568 
4569  if (node->hash_metacxt != NULL)
4570  {
4572  node->hash_metacxt = NULL;
4573  }
4574 
4575  for (transno = 0; transno < node->numtrans; transno++)
4576  {
4577  AggStatePerTrans pertrans = &node->pertrans[transno];
4578 
4579  for (setno = 0; setno < numGroupingSets; setno++)
4580  {
4581  if (pertrans->sortstates[setno])
4582  tuplesort_end(pertrans->sortstates[setno]);
4583  }
4584  }
4585 
4586  /* And ensure any agg shutdown callbacks have been called */
4587  for (setno = 0; setno < numGroupingSets; setno++)
4588  ReScanExprContext(node->aggcontexts[setno]);
4589  if (node->hashcontext)
4591 
4592  /*
4593  * We don't actually free any ExprContexts here (see comment in
4594  * ExecFreeExprContext), just unlinking the output one from the plan node
4595  * suffices.
4596  */
4597  ExecFreeExprContext(&node->ss.ps);
4598 
4599  /* clean up tuple table */
4601 
4602  outerPlan = outerPlanState(node);
4603  ExecEndNode(outerPlan);
4604 }
4605 
4606 void
4608 {
4609  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4611  Agg *aggnode = (Agg *) node->ss.ps.plan;
4612  int transno;
4613  int numGroupingSets = Max(node->maxsets, 1);
4614  int setno;
4615 
4616  node->agg_done = false;
4617 
4618  if (node->aggstrategy == AGG_HASHED)
4619  {
4620  /*
4621  * In the hashed case, if we haven't yet built the hash table then we
4622  * can just return; nothing done yet, so nothing to undo. If subnode's
4623  * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4624  * else no reason to re-scan it at all.
4625  */
4626  if (!node->table_filled)
4627  return;
4628 
4629  /*
4630  * If we do have the hash table, and it never spilled, and the subplan
4631  * does not have any parameter changes, and none of our own parameter
4632  * changes affect input expressions of the aggregated functions, then
4633  * we can just rescan the existing hash table; no need to build it
4634  * again.
4635  */
4636  if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4637  !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4638  {
4640  &node->perhash[0].hashiter);
4641  select_current_set(node, 0, true);
4642  return;
4643  }
4644  }
4645 
4646  /* Make sure we have closed any open tuplesorts */
4647  for (transno = 0; transno < node->numtrans; transno++)
4648  {
4649  for (setno = 0; setno < numGroupingSets; setno++)
4650  {
4651  AggStatePerTrans pertrans = &node->pertrans[transno];
4652 
4653  if (pertrans->sortstates[setno])
4654  {
4655  tuplesort_end(pertrans->sortstates[setno]);
4656  pertrans->sortstates[setno] = NULL;
4657  }
4658  }
4659  }
4660 
4661  /*
4662  * We don't need to ReScanExprContext the output tuple context here;
4663  * ExecReScan already did it. But we do need to reset our per-grouping-set
4664  * contexts, which may have transvalues stored in them. (We use rescan
4665  * rather than just reset because transfns may have registered callbacks
4666  * that need to be run now.) For the AGG_HASHED case, see below.
4667  */
4668 
4669  for (setno = 0; setno < numGroupingSets; setno++)
4670  {
4671  ReScanExprContext(node->aggcontexts[setno]);
4672  }
4673 
4674  /* Release first tuple of group, if we have made a copy */
4675  if (node->grp_firstTuple != NULL)
4676  {
4678  node->grp_firstTuple = NULL;
4679  }
4681 
4682  /* Forget current agg values */
4683  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4684  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4685 
4686  /*
4687  * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4688  * the hashcontext. This used to be an issue, but now, resetting a context
4689  * automatically deletes sub-contexts too.
4690  */
4691  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4692  {
4694 
4695  node->hash_ever_spilled = false;
4696  node->hash_spill_mode = false;
4697  node->hash_ngroups_current = 0;
4698 
4700  /* Rebuild an empty hash table */
4701  build_hash_tables(node);
4702  node->table_filled = false;
4703  /* iterator will be reset when the table is filled */
4704 
4705  hashagg_recompile_expressions(node, false, false);
4706  }
4707 
4708  if (node->aggstrategy != AGG_HASHED)
4709  {
4710  /*
4711  * Reset the per-group state (in particular, mark transvalues null)
4712  */
4713  for (setno = 0; setno < numGroupingSets; setno++)
4714  {
4715  MemSet(node->pergroups[setno], 0,
4716  sizeof(AggStatePerGroupData) * node->numaggs);
4717  }
4718 
4719  /* reset to phase 1 */
4720  initialize_phase(node, 1);
4721 
4722  node->input_done = false;
4723  node->projected_set = -1;
4724  }
4725 
4726  if (outerPlan->chgParam == NULL)
4727  ExecReScan(outerPlan);
4728 }
4729 
4730 
4731 /***********************************************************************
4732  * API exposed to aggregate functions
4733  ***********************************************************************/
4734 
4735 
4736 /*
4737  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4738  *
4739  * The transition and/or final functions of an aggregate may want to verify
4740  * that they are being called as aggregates, rather than as plain SQL
4741  * functions. They should use this function to do so. The return value
4742  * is nonzero if being called as an aggregate, or zero if not. (Specific
4743  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4744  * values could conceivably appear in future.)
4745  *
4746  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4747  * identity of the memory context that aggregate transition values are being
4748  * stored in. Note that the same aggregate call site (flinfo) may be called
4749  * interleaved on different transition values in different contexts, so it's
4750  * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4751  * cache it in the transvalue itself (for internal-type transvalues).
4752  */
4753 int
4755 {
4756  if (fcinfo->context && IsA(fcinfo->context, AggState))
4757  {
4758  if (aggcontext)
4759  {
4760  AggState *aggstate = ((AggState *) fcinfo->context);
4761  ExprContext *cxt = aggstate->curaggcontext;
4762 
4763  *aggcontext = cxt->ecxt_per_tuple_memory;
4764  }
4765  return AGG_CONTEXT_AGGREGATE;
4766  }
4767  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4768  {
4769  if (aggcontext)
4770  *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4771  return AGG_CONTEXT_WINDOW;
4772  }
4773 
4774  /* this is just to prevent "uninitialized variable" warnings */
4775  if (aggcontext)
4776  *aggcontext = NULL;
4777  return 0;
4778 }
4779 
4780 /*
4781  * AggGetAggref - allow an aggregate support function to get its Aggref
4782  *
4783  * If the function is being called as an aggregate support function,
4784  * return the Aggref node for the aggregate call. Otherwise, return NULL.
4785  *
4786  * Aggregates sharing the same inputs and transition functions can get
4787  * merged into a single transition calculation. If the transition function
4788  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4789  * executing. It must therefore not pay attention to the Aggref fields that
4790  * relate to the final function, as those are indeterminate. But if a final
4791  * function calls AggGetAggref, it will get a precise result.
4792  *
4793  * Note that if an aggregate is being used as a window function, this will
4794  * return NULL. We could provide a similar function to return the relevant
4795  * WindowFunc node in such cases, but it's not needed yet.
4796  */
4797 Aggref *
4799 {
4800  if (fcinfo->context && IsA(fcinfo->context, AggState))
4801  {
4802  AggState *aggstate = (AggState *) fcinfo->context;
4803  AggStatePerAgg curperagg;
4804  AggStatePerTrans curpertrans;
4805 
4806  /* check curperagg (valid when in a final function) */
4807  curperagg = aggstate->curperagg;
4808 
4809  if (curperagg)
4810  return curperagg->aggref;
4811 
4812  /* check curpertrans (valid when in a transition function) */
4813  curpertrans = aggstate->curpertrans;
4814 
4815  if (curpertrans)
4816  return curpertrans->aggref;
4817  }
4818  return NULL;
4819 }
4820 
4821 /*
4822  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4823  *
4824  * This is useful in agg final functions; the context returned is one that
4825  * the final function can safely reset as desired. This isn't useful for
4826  * transition functions, since the context returned MAY (we don't promise)
4827  * be the same as the context those are called in.
4828  *
4829  * As above, this is currently not useful for aggs called as window functions.
4830  */
4833 {
4834  if (fcinfo->context && IsA(fcinfo->context, AggState))
4835  {
4836  AggState *aggstate = (AggState *) fcinfo->context;
4837 
4838  return aggstate->tmpcontext->ecxt_per_tuple_memory;
4839  }
4840  return NULL;
4841 }
4842 
4843 /*
4844  * AggStateIsShared - find out whether transition state is shared
4845  *
4846  * If the function is being called as an aggregate support function,
4847  * return true if the aggregate's transition state is shared across
4848  * multiple aggregates, false if it is not.
4849  *
4850  * Returns true if not called as an aggregate support function.
4851  * This is intended as a conservative answer, ie "no you'd better not
4852  * scribble on your input". In particular, will return true if the
4853  * aggregate is being used as a window function, which is a scenario
4854  * in which changing the transition state is a bad idea. We might
4855  * want to refine the behavior for the window case in future.
4856  */
4857 bool
4859 {
4860  if (fcinfo->context && IsA(fcinfo->context, AggState))
4861  {
4862  AggState *aggstate = (AggState *) fcinfo->context;
4863  AggStatePerAgg curperagg;
4864  AggStatePerTrans curpertrans;
4865 
4866  /* check curperagg (valid when in a final function) */
4867  curperagg = aggstate->curperagg;
4868 
4869  if (curperagg)
4870  return aggstate->pertrans[curperagg->transno].aggshared;
4871 
4872  /* check curpertrans (valid when in a transition function) */
4873  curpertrans = aggstate->curpertrans;
4874 
4875  if (curpertrans)
4876  return curpertrans->aggshared;
4877  }
4878  return true;
4879 }
4880 
4881 /*
4882  * AggRegisterCallback - register a cleanup callback for an aggregate
4883  *
4884  * This is useful for aggs to register shutdown callbacks, which will ensure
4885  * that non-memory resources are freed. The callback will occur just before
4886  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4887  * either between groups or as a result of rescanning the query. The callback
4888  * will NOT be called on error paths. The typical use-case is for freeing of
4889  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4890  * created by the agg functions. (The callback will not be called until after
4891  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4892  * to return data that will be freed by the callback.)
4893  *
4894  * As above, this is currently not useful for aggs called as window functions.
4895  */
4896 void
4899  Datum arg)
4900 {
4901  if (fcinfo->context && IsA(fcinfo->context, AggState))
4902  {
4903  AggState *aggstate = (AggState *) fcinfo->context;
4904  ExprContext *cxt = aggstate->curaggcontext;
4905 
4906  RegisterExprContextCallback(cxt, func, arg);
4907 
4908  return;
4909  }
4910  elog(ERROR, "aggregate function cannot register a callback in this context");
4911 }
4912 
4913 
4914 /*
4915  * aggregate_dummy - dummy execution routine for aggregate functions
4916  *
4917  * This function is listed as the implementation (prosrc field) of pg_proc
4918  * entries for aggregate functions. Its only purpose is to throw an error
4919  * if someone mistakenly executes such a function in the normal way.
4920  *
4921  * Perhaps someday we could assign real meaning to the prosrc field of
4922  * an aggregate?
4923  */
4924 Datum
4926 {
4927  elog(ERROR, "aggregate function %u called as normal function",
4928  fcinfo->flinfo->fn_oid);
4929  return (Datum) 0; /* keep compiler quiet */
4930 }
4931 
4932 /* ----------------------------------------------------------------
4933  * Parallel Query Support
4934  * ----------------------------------------------------------------
4935  */
4936 
4937  /* ----------------------------------------------------------------
4938  * ExecAggEstimate
4939  *
4940  * Estimate space required to propagate aggregate statistics.
4941  * ----------------------------------------------------------------
4942  */
4943 void
4945 {
4946  Size size;
4947 
4948  /* don't need this if not instrumenting or no workers */
4949  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4950  return;
4951 
4952  size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4953  size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4954  shm_toc_estimate_chunk(&pcxt->estimator, size);
4955  shm_toc_estimate_keys(&pcxt->estimator, 1);
4956 }
4957 
4958 /* ----------------------------------------------------------------
4959  * ExecAggInitializeDSM
4960  *
4961  * Initialize DSM space for aggregate statistics.
4962  * ----------------------------------------------------------------
4963  */
4964 void
4966 {
4967  Size size;
4968 
4969  /* don't need this if not instrumenting or no workers */
4970  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4971  return;
4972 
4973  size = offsetof(SharedAggInfo, sinstrument)
4974  + pcxt->nworkers * sizeof(AggregateInstrumentation);
4975  node->shared_info = shm_toc_allocate(pcxt->toc, size);
4976  /* ensure any unfilled slots will contain zeroes */
4977  memset(node->shared_info, 0, size);
4978  node->shared_info->num_workers = pcxt->nworkers;
4979  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4980  node->shared_info);
4981 }
4982 
4983 /* ----------------------------------------------------------------
4984  * ExecAggInitializeWorker
4985  *
4986  * Attach worker to DSM space for aggregate statistics.
4987  * ----------------------------------------------------------------
4988  */
4989 void
4991 {
4992  node->shared_info =
4993  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
4994 }
4995 
4996 /* ----------------------------------------------------------------
4997  * ExecAggRetrieveInstrumentation
4998  *
4999  * Transfer aggregate statistics from DSM to private memory.
5000  * ----------------------------------------------------------------
5001  */
5002 void
5004 {
5005  Size size;
5006  SharedAggInfo *si;
5007 
5008  if (node->shared_info == NULL)
5009  return;
5010 
5011  size = offsetof(SharedAggInfo, sinstrument)
5013  si = palloc(size);
5014  memcpy(si, node->shared_info, size);
5015  node->shared_info = si;
5016 }
static void hashagg_reset_spill_state(AggState *aggstate)
Definition: nodeAgg.c:3165
List * aggdistinct
Definition: primnodes.h:321
struct AggStatePerTransData * AggStatePerTrans
Definition: execnodes.h:2139
ExprState ** eqfunctions
Definition: nodeAgg.h:278
struct HashAggSpill * hash_spills
Definition: execnodes.h:2190
AggStatePerGroup * hash_pergroup
Definition: execnodes.h:2210
#define NIL
Definition: pg_list.h:65
static HashAggBatch * hashagg_batch_new(LogicalTapeSet *tapeset, int input_tapenum, int setno, int64 input_tuples, int used_bits)
Definition: nodeAgg.c:3030
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:963
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition: nodeAgg.c:559
struct AggStatePerGroupData * AggStatePerGroup
Definition: execnodes.h:2140
#define ScanTupleHashTable(htable, iter)
Definition: execnodes.h:729
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition: nodeAgg.c:467
int numCols
Definition: plannodes.h:821
static int partitions
Definition: pgbench.c:195
List * qual
Definition: plannodes.h:143
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev)
Definition: tuplesort.c:2418
TupleHashTable BuildTupleHashTableExt(PlanState *parent, TupleDesc inputDesc, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, long nbuckets, Size additionalsize, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, bool use_variable_hash_iv)
Definition: execGrouping.c:154
Datum aggregate_dummy(PG_FUNCTION_ARGS)
Definition: nodeAgg.c:4925
bool aggvariadic
Definition: primnodes.h:324
int bms_first_member(Bitmapset *a)
Definition: bitmapset.c:996
AggStatePerPhase phases
Definition: execnodes.h:2177
double hashentrysize
Definition: execnodes.h:2202
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1964
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
Definition: nodeAgg.c:2924
#define AllocSetContextCreate
Definition: memutils.h:170
AttrNumber * hashGrpColIdxInput
Definition: nodeAgg.h:311
Datum * ecxt_aggvalues
Definition: execnodes.h:245
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition: nodeAgg.c:1896
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
Definition: execGrouping.c:350
uint64 hash_ngroups_limit
Definition: execnodes.h:2199
#define HASHAGG_MAX_PARTITIONS
Definition: nodeAgg.c:287
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:305
Index varlevelsup
Definition: primnodes.h:191
TargetEntry * get_sortgroupclause_tle(SortGroupClause *sgClause, List *targetList)
Definition: tlist.c:389
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:1171
static void hash_agg_check_limits(AggState *aggstate)
Definition: nodeAgg.c:1835
static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory)
Definition: nodeAgg.c:1945
AttrNumber * grpColIdx
Definition: plannodes.h:822
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:985
uint64 transitionSpace
Definition: plannodes.h:826
Instrumentation * instrument
Definition: execnodes.h:955
static void agg_fill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2534
Bitmapset * colnos_needed
Definition: execnodes.h:2172
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:504
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash, bool *in_hash_table)
Definition: nodeAgg.c:2030
List * lcons_int(int datum, List *list)
Definition: list.c:471
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:1546
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3033
int numaggs
Definition: execnodes.h:2148
int nfreetapes
Definition: nodeAgg.c:321
Oid GetUserId(void)
Definition: miscinit.c:448
bool agg_done
Definition: execnodes.h:2166
#define castNode(_type_, nodeptr)
Definition: nodes.h:598
Oid * grpCollations
Definition: plannodes.h:824
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
#define TTS_EMPTY(slot)
Definition: tuptable.h:97
TupleTableSlot * sort_slot
Definition: execnodes.h:2180
List * all_grouped_cols
Definition: execnodes.h:2171
Tuplesortstate * sort_out
Definition: execnodes.h:2179
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1151
ScanState ss
Definition: execnodes.h:2146
FmgrInfo equalfnOne
Definition: nodeAgg.h:110
ExprContext * ps_ExprContext
Definition: execnodes.h:984
MinimalTuple firstTuple
Definition: execnodes.h:682
shm_toc_estimator estimator
Definition: parallel.h:42
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition: nodeAgg.c:3133
ExprState * evaltrans
Definition: nodeAgg.h:283
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
int64 input_tuples
Definition: nodeAgg.c:359
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
bool datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen)
Definition: datum.c:222
int plan_node_id
Definition: plannodes.h:141
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct HashTapeInfo HashTapeInfo
Oid inputcollid
Definition: primnodes.h:315
int current_phase
Definition: execnodes.h:2154
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition: nodeAgg.c:3099
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition: tuptable.h:341
Definition: nodes.h:529
AggSplit aggsplit
Definition: execnodes.h:2151
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition: nodeAgg.c:2152
bool * nullsFirst
Definition: plannodes.h:774
int errcode(int sqlerrcode)
Definition: elog.c:610
List * args
Definition: primnodes.h:319
#define MemSet(start, val, len)
Definition: c.h:978
AttrNumber varattno
Definition: primnodes.h:186
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
fmNodePtr context
Definition: fmgr.h:88
Datum * tts_values
Definition: tuptable.h:126
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1335
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition: parse_agg.c:2013
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1048
bool all_cols_needed
Definition: execnodes.h:2174
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2037
AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2122
TupleTableSlot * hash_spill_rslot
Definition: execnodes.h:2192
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:726
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, Oid aggtransfn, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, List *transnos)
Definition: nodeAgg.c:4486
AggStatePerTrans pertrans
Definition: execnodes.h:2156
EState * state
Definition: execnodes.h:947
int projected_set
Definition: execnodes.h:2167
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1152
bool aggstar
Definition: primnodes.h:323
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:370
HeapTuple grp_firstTuple
Definition: execnodes.h:2184
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
Definition: primnodes.h:181
Aggref * aggref
Definition: nodeAgg.h:187
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition: nodeAgg.c:1365
static void advance_aggregates(AggState *aggstate)
Definition: nodeAgg.c:830
int current_set
Definition: execnodes.h:2169
uint32 mask
Definition: nodeAgg.c:340
#define OidIsValid(objectId)
Definition: c.h:651
#define DO_AGGSPLIT_COMBINE(as)
Definition: nodes.h:791
FunctionCallInfo transfn_fcinfo
Definition: nodeAgg.h:162
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4965
struct HashAggBatch HashAggBatch
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:655
Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
int numtrans
Definition: execnodes.h:2149
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1439
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition: nodeAgg.c:1861
TupleDesc sortdesc
Definition: nodeAgg.h:138
Oid * sortOperators
Definition: plannodes.h:772
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:937
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
Definition: execGrouping.c:96
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
void ResetTupleHashTable(TupleHashTable hashtable)
Definition: execGrouping.c:282
ExprContext * tmpcontext
Definition: execnodes.h:2159
FmgrInfo transfn
Definition: nodeAgg.h:81
#define HASHAGG_PARTITION_FACTOR
Definition: nodeAgg.c:285
static void build_hash_table(AggState *aggstate, int setno, long nbuckets)
Definition: nodeAgg.c:1491
int max_colno_needed
Definition: execnodes.h:2173
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition: nodeAgg.c:1257
bool hash_spill_mode
Definition: execnodes.h:2196
#define FUNC_MAX_ARGS
static void hashagg_tapeinfo_init(AggState *aggstate)
Definition: nodeAgg.c:2880
List * hash_batches
Definition: execnodes.h:2194
Aggref * aggref
Definition: nodeAgg.h:44
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:878
#define linitial_int(l)
Definition: pg_list.h:196
Bitmapset ** grouped_cols
Definition: nodeAgg.h:277
PlanState ps
Definition: execnodes.h:1332
LogicalTapeSet * tapeset
Definition: nodeAgg.c:357
int maxsets
Definition: execnodes.h:2176
#define true
Definition: c.h:328
static bool agg_refill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2588
static bool find_cols_walker(Node *node, FindColsContext *context)
Definition: nodeAgg.c:1408
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
Definition: nodeAgg.c:1680
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define DO_AGGSPLIT_SERIALIZE(as)
Definition: nodes.h:793
#define HASHAGG_MIN_PARTITIONS
Definition: nodeAgg.c:286
void pfree(void *pointer)
Definition: mcxt.c:1056
MemoryContext es_query_cxt
Definition: execnodes.h:555
AggStrategy aggstrategy
Definition: plannodes.h:819
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition: nodeAgg.c:3217
#define linitial(l)
Definition: pg_list.h:195
bool table_filled
Definition: execnodes.h:2186
AggStrategy aggstrategy
Definition: execnodes.h:2150
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:761
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2735
static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
Definition: nodeAgg.c:1391
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:61
#define lfirst_int(lc)
Definition: pg_list.h:191
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1520
MemoryContext hash_metacxt
Definition: execnodes.h:2188
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition: nodeAgg.c:2188
#define AGG_CONTEXT_AGGREGATE
Definition: fmgr.h:736
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:672
struct TupleHashEntryData TupleHashEntryData
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static void find_hash_columns(AggState *aggstate)
Definition: nodeAgg.c:1551
ExprState * equalfnMulti
Definition: nodeAgg.h:111
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
Tuplesortstate * sort_in
Definition: execnodes.h:2178
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1039
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2332
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup)
Definition: nodeAgg.c:1302
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4858
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, int ndest)
Definition: nodeAgg.c:2901
static int hash_choose_num_partitions(uint64 input_groups, double hashentrysize, int used_bits, int *log2_npartittions)
Definition: nodeAgg.c:1970
Aggref * aggref
Definition: execnodes.h:752
#define list_nth_node(type, list, n)
Definition: pg_list.h:305
Tuplesortstate ** sortstates
Definition: nodeAgg.h:154
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
Bitmapset * aggParams
Definition: plannodes.h:827
static int initValue(long lng_val)
Definition: informix.c:677
MemoryContext tablecxt
Definition: execnodes.h:704
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:540
bool * tts_isnull
Definition: tuptable.h:128
int npartitions
Definition: nodeAgg.c:337
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
void hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions)
Definition: nodeAgg.c:1778
MinimalTupleData * MinimalTuple
Definition: htup.h:27
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:954
List * aggorder
Definition: primnodes.h:320
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4944
int errcode_for_file_access(void)
Definition: elog.c:633
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:135
AttrNumber resno
Definition: primnodes.h:1408
#define DatumGetBool(X)
Definition: postgres.h:393
int ParallelWorkerNumber
Definition: parallel.c:112
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
Definition: nodeAgg.c:2970
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
Index agglevelsup
Definition: primnodes.h:327
int used_bits
Definition: nodeAgg.c:356
struct AggregateInstrumentation AggregateInstrumentation
Bitmapset * unaggregated
Definition: nodeAgg.c:367
#define TupIsNull(slot)
Definition: tuptable.h:292
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:374
List * aggdirectargs
Definition: primnodes.h:318
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition: nodeAgg.c:4380
AggStatePerAgg curperagg
Definition: execnodes.h:2162
AttrNumber * sortColIdx
Definition: nodeAgg.h:100
struct AggStatePerGroupData AggStatePerGroupData
struct HashTapeInfo * hash_tapeinfo
Definition: execnodes.h:2189
AggStatePerHash perhash
Definition: execnodes.h:2209
bool outeropsset
Definition: execnodes.h:1026
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition: nodeAgg.c:677
AggStrategy aggstrategy
Definition: nodeAgg.h:274
ExprState * evaltrans_cache[2][2]
Definition: nodeAgg.h:291
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1047
#define EXEC_FLAG_REWIND
Definition: executor.h:57
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2751
void build_aggregate_combinefn_expr(Oid agg_state_type, Oid agg_input_collation, Oid combinefn_oid, Expr **combinefnexpr)
Definition: parse_agg.c:1961
Datum value
Definition: postgres.h:378
Bitmapset * grouped_cols
Definition: execnodes.h:2170
#define IsParallelWorker()
Definition: parallel.h:61
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:131
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1141
void ExecAggRetrieveInstrumentation(AggState *node)
Definition: nodeAgg.c:5003
int hash_batches_used
Definition: execnodes.h:2207
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4832
List * lappend_int(List *list, int datum)
Definition: list.c:339
Bitmapset * chgParam
Definition: execnodes.h:977
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:191
int ntapes
Definition: nodeAgg.c:319
bool IsBinaryCoercible(Oid srctype, Oid targettype)
int my_log2(long num)
Definition: dynahash.c:1729
#define outerPlan(node)
Definition: plannodes.h:172
List * lappend(List *list, void *datum)
Definition: list.c:321
Bitmapset * aggregated
Definition: nodeAgg.c:366
TupleHashIterator hashiter
Definition: nodeAgg.h:304
int numCols
Definition: plannodes.h:770
Index varno
Definition: primnodes.h:184
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:590
int num_hashes
Definition: execnodes.h:2187
Plan plan
Definition: plannodes.h:818
AttrNumber * hashGrpColIdxHash
Definition: nodeAgg.h:312
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
bool input_done
Definition: execnodes.h:2165
#define SizeofMinimalTupleHeader
Definition: htup_details.h:649
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
ExprContext * curaggcontext
Definition: execnodes.h:2161
ExprContext * hashcontext
Definition: execnodes.h:2157
bool * ecxt_aggnulls
Definition: execnodes.h:247
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:397
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define TextDatumGetCString(d)
Definition: builtins.h:88
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos)
Definition: nodeAgg.c:4408
List * es_tupleTable
Definition: execnodes.h:557
#define HASHAGG_READ_BUFFER_SIZE
Definition: nodeAgg.c:295
AggStatePerPhase phase
Definition: execnodes.h:2152
void * palloc0(Size size)
Definition: mcxt.c:980
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:951
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
struct FunctionCallInfoBaseData * FunctionCallInfo
Definition: fmgr.h:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:240
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
FmgrInfo deserialfn
Definition: nodeAgg.h:87
int work_mem
Definition: globals.c:121
List * groupingSets
Definition: plannodes.h:829
int16 resulttypeLen
Definition: nodeAgg.h:216
static void initialize_phase(AggState *aggstate, int newphase)
Definition: nodeAgg.c:489
LogicalTapeSet * tapeset
Definition: nodeAgg.c:318
struct FindColsContext FindColsContext
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:133
Plan * plan
Definition: execnodes.h:945
#define InvalidOid
Definition: postgres_ext.h:36
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1202
Oid aggfnoid
Definition: primnodes.h:312
int16 attnum
Definition: pg_attribute.h:79
#define ResetTupleHashIterator(htable, iter)
Definition: execnodes.h:727
#define ereport(elevel,...)
Definition: elog.h:144