PostgreSQL Source Code  git master
nodeAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  * Routines to handle aggregate nodes.
5  *
6  * ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  * transvalue = initcond
9  * foreach input_tuple do
10  * transvalue = transfunc(transvalue, input_value(s))
11  * result = finalfunc(transvalue, direct_argument(s))
12  *
13  * If a finalfunc is not supplied then the result is just the ending
14  * value of transvalue.
15  *
16  * Other behaviors can be selected by the "aggsplit" mode, which exists
17  * to support partial aggregation. It is possible to:
18  * * Skip running the finalfunc, so that the output is always the
19  * final transvalue state.
20  * * Substitute the combinefunc for the transfunc, so that transvalue
21  * states (propagated up from a child partial-aggregation step) are merged
22  * rather than processing raw input rows. (The statements below about
23  * the transfunc apply equally to the combinefunc, when it's selected.)
24  * * Apply the serializefunc to the output values (this only makes sense
25  * when skipping the finalfunc, since the serializefunc works on the
26  * transvalue data type).
27  * * Apply the deserializefunc to the input values (this only makes sense
28  * when using the combinefunc, for similar reasons).
29  * It is the planner's responsibility to connect up Agg nodes using these
30  * alternate behaviors in a way that makes sense, with partial aggregation
31  * results being fed to nodes that expect them.
32  *
33  * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  * input tuples and eliminate duplicates (if required) before performing
35  * the above-depicted process. (However, we don't do that for ordered-set
36  * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  * so far as this module is concerned.) Note that partial aggregation
38  * is not supported in these cases, since we couldn't ensure global
39  * ordering or distinctness of the inputs.
40  *
41  * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  * then the first non-NULL input_value is assigned directly to transvalue,
43  * and transfunc isn't applied until the second non-NULL input_value.
44  * The agg's first input type and transtype must be the same in this case!
45  *
46  * If transfunc is marked "strict" then NULL input_values are skipped,
47  * keeping the previous transvalue. If transfunc is not strict then it
48  * is called for every input tuple and must deal with NULL initcond
49  * or NULL input_values for itself.
50  *
51  * If finalfunc is marked "strict" then it is not called when the
52  * ending transvalue is NULL, instead a NULL result is created
53  * automatically (this is just the usual handling of strict functions,
54  * of course). A non-strict finalfunc can make its own choice of
55  * what to return for a NULL ending transvalue.
56  *
57  * Ordered-set aggregates are treated specially in one other way: we
58  * evaluate any "direct" arguments and pass them to the finalfunc along
59  * with the transition value.
60  *
61  * A finalfunc can have additional arguments beyond the transvalue and
62  * any "direct" arguments, corresponding to the input arguments of the
63  * aggregate. These are always just passed as NULL. Such arguments may be
64  * needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  * We compute aggregate input expressions and run the transition functions
67  * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68  * once per input tuple, so when the transvalue datatype is
69  * pass-by-reference, we have to be careful to copy it into a longer-lived
70  * memory context, and free the prior value to avoid memory leakage. We
71  * store transvalues in another set of econtexts, aggstate->aggcontexts
72  * (one per grouping set, see below), which are also used for the hashtable
73  * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74  * reset, at group boundaries so that aggregate transition functions can
75  * register shutdown callbacks via AggRegisterCallback.
76  *
77  * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  * run finalize functions and compute the output tuple; this context can be
79  * reset once per output tuple.
80  *
81  * The executor's AggState node is passed as the fmgr "context" value in
82  * all transfunc and finalfunc calls. It is not recommended that the
83  * transition functions look at the AggState node directly, but they can
84  * use AggCheckCallContext() to verify that they are being called by
85  * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86  * transition function might want to know this is so that it can avoid
87  * palloc'ing a fixed-size pass-by-ref transition value on every call:
88  * it can instead just scribble on and return its left input. Ordinarily
89  * it is completely forbidden for functions to modify pass-by-ref inputs,
90  * but in the aggregate case we know the left input is either the initial
91  * transition value or a previous function result, and in either case its
92  * value need not be preserved. See int8inc() for an example. Notice that
93  * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  * the previous transition value pointer is returned. It is also possible
95  * to avoid repeated data copying when the transition value is an expanded
96  * object: to do that, the transition function must take care to return
97  * an expanded object that is in a child context of the memory context
98  * returned by AggCheckCallContext(). Also, some transition functions want
99  * to store working state in addition to the nominal transition value; they
100  * can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103  * AggState is available as context in earlier releases (back to 8.1),
104  * but direct examination of the node is needed to use it before 9.0.
105  *
106  * As of 9.4, aggregate transition functions can also use AggGetAggref()
107  * to get hold of the Aggref expression node for their aggregate call.
108  * This is mainly intended for ordered-set aggregates, which are not
109  * supported as window functions. (A regular aggregate function would
110  * need some fallback logic to use this, since there's no Aggref node
111  * for a window function.)
112  *
113  * Grouping sets:
114  *
115  * A list of grouping sets which is structurally equivalent to a ROLLUP
116  * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  * ordered data. We do this by keeping a separate set of transition values
118  * for each grouping set being concurrently processed; for each input tuple
119  * we update them all, and on group boundaries we reset those states
120  * (starting at the front of the list) whose grouping values have changed
121  * (the list of grouping sets is ordered from most specific to least
122  * specific).
123  *
124  * Where more complex grouping sets are used, we break them down into
125  * "phases", where each phase has a different sort order (except phase 0
126  * which is reserved for hashing). During each phase but the last, the
127  * input tuples are additionally stored in a tuplesort which is keyed to the
128  * next phase's sort order; during each phase but the first, the input
129  * tuples are drawn from the previously sorted data. (The sorting of the
130  * data for the first phase is handled by the planner, as it might be
131  * satisfied by underlying nodes.)
132  *
133  * Hashing can be mixed with sorted grouping. To do this, we have an
134  * AGG_MIXED strategy that populates the hashtables during the first sorted
135  * phase, and switches to reading them out after completing all sort phases.
136  * We can also support AGG_HASHED with multiple hash tables and no sorting
137  * at all.
138  *
139  * From the perspective of aggregate transition and final functions, the
140  * only issue regarding grouping sets is this: a single call site (flinfo)
141  * of an aggregate function may be used for updating several different
142  * transition values in turn. So the function must not cache in the flinfo
143  * anything which logically belongs as part of the transition value (most
144  * importantly, the memory context in which the transition value exists).
145  * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  * sensitive to the grouping set for which the aggregate function is
147  * currently being called.
148  *
149  * Plan structure:
150  *
151  * What we get from the planner is actually one "real" Agg node which is
152  * part of the plan tree proper, but which optionally has an additional list
153  * of Agg nodes hung off the side via the "chain" field. This is because an
154  * Agg node happens to be a convenient representation of all the data we
155  * need for grouping sets.
156  *
157  * For many purposes, we treat the "real" node as if it were just the first
158  * node in the chain. The chain must be ordered such that hashed entries
159  * come before sorted/plain entries; the real node is marked AGG_MIXED if
160  * there are both types present (in which case the real node describes one
161  * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163  * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  * chained nodes.
166  *
167  * We collect all hashed nodes into a single "phase", numbered 0, and create
168  * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  * Phase 0 is allocated even if there are no hashes, but remains unused in
170  * that case.
171  *
172  * AGG_HASHED nodes actually refer to only a single grouping set each,
173  * because for each hashed grouping we need a separate grpColIdx and
174  * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175  * grouping sets that share a sort order. Each AGG_SORTED node other than
176  * the first one has an associated Sort node which describes the sort order
177  * to be used; the first sorted node takes its input from the outer subtree,
178  * which the planner has already arranged to provide ordered data.
179  *
180  * Memory and ExprContext usage:
181  *
182  * Because we're accumulating aggregate values across input rows, we need to
183  * use more memory contexts than just simple input/output tuple contexts.
184  * In fact, for a rollup, we need a separate context for each grouping set
185  * so that we can reset the inner (finer-grained) aggregates on their group
186  * boundaries while continuing to accumulate values for outer
187  * (coarser-grained) groupings. On top of this, we might be simultaneously
188  * populating hashtables; however, we only need one context for all the
189  * hashtables.
190  *
191  * So we create an array, aggcontexts, with an ExprContext for each grouping
192  * set in the largest rollup that we're going to process, and use the
193  * per-tuple memory context of those ExprContexts to store the aggregate
194  * transition values. hashcontext is the single context created to support
195  * all hash tables.
196  *
197  * Spilling To Disk
198  *
199  * When performing hash aggregation, if the hash table memory exceeds the
200  * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  * mode, we advance the transition states only for groups already in the
202  * hash table. For tuples that would need to create a new hash table
203  * entries (and initialize new transition states), we instead spill them to
204  * disk to be processed later. The tuples are spilled in a partitioned
205  * manner, so that subsequent batches are smaller and less likely to exceed
206  * hash_mem (if a batch does exceed hash_mem, it must be spilled
207  * recursively).
208  *
209  * Spilled data is written to logical tapes. These provide better control
210  * over memory usage, disk space, and the number of files than if we were
211  * to use a BufFile for each spill.
212  *
213  * Note that it's possible for transition states to start small but then
214  * grow very large; for instance in the case of ARRAY_AGG. In such cases,
215  * it's still possible to significantly exceed hash_mem. We try to avoid
216  * this situation by estimating what will fit in the available memory, and
217  * imposing a limit on the number of groups separately from the amount of
218  * memory consumed.
219  *
220  * Transition / Combine function invocation:
221  *
222  * For performance reasons transition functions, including combine
223  * functions, aren't invoked one-by-one from nodeAgg.c after computing
224  * arguments using the expression evaluation engine. Instead
225  * ExecBuildAggTrans() builds one large expression that does both argument
226  * evaluation and transition function invocation. That avoids performance
227  * issues due to repeated uses of expression evaluation, complications due
228  * to filter expressions having to be evaluated early, and allows to JIT
229  * the entire expression into one native function.
230  *
231  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
232  * Portions Copyright (c) 1994, Regents of the University of California
233  *
234  * IDENTIFICATION
235  * src/backend/executor/nodeAgg.c
236  *
237  *-------------------------------------------------------------------------
238  */
239 
240 #include "postgres.h"
241 
242 #include "access/htup_details.h"
243 #include "access/parallel.h"
244 #include "catalog/objectaccess.h"
245 #include "catalog/pg_aggregate.h"
246 #include "catalog/pg_proc.h"
247 #include "catalog/pg_type.h"
248 #include "common/hashfn.h"
249 #include "executor/execExpr.h"
250 #include "executor/executor.h"
251 #include "executor/nodeAgg.h"
252 #include "lib/hyperloglog.h"
253 #include "miscadmin.h"
254 #include "nodes/makefuncs.h"
255 #include "nodes/nodeFuncs.h"
256 #include "optimizer/optimizer.h"
257 #include "parser/parse_agg.h"
258 #include "parser/parse_coerce.h"
259 #include "utils/acl.h"
260 #include "utils/builtins.h"
261 #include "utils/datum.h"
262 #include "utils/dynahash.h"
263 #include "utils/expandeddatum.h"
264 #include "utils/logtape.h"
265 #include "utils/lsyscache.h"
266 #include "utils/memutils.h"
267 #include "utils/syscache.h"
268 #include "utils/tuplesort.h"
269 
270 /*
271  * Control how many partitions are created when spilling HashAgg to
272  * disk.
273  *
274  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
275  * partitions needed such that each partition will fit in memory. The factor
276  * is set higher than one because there's not a high cost to having a few too
277  * many partitions, and it makes it less likely that a partition will need to
278  * be spilled recursively. Another benefit of having more, smaller partitions
279  * is that small hash tables may perform better than large ones due to memory
280  * caching effects.
281  *
282  * We also specify a min and max number of partitions per spill. Too few might
283  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
284  * many will result in lots of memory wasted buffering the spill files (which
285  * could instead be spent on a larger hash table).
286  */
287 #define HASHAGG_PARTITION_FACTOR 1.50
288 #define HASHAGG_MIN_PARTITIONS 4
289 #define HASHAGG_MAX_PARTITIONS 1024
290 
291 /*
292  * For reading from tapes, the buffer size must be a multiple of
293  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
294  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
295  * tape always uses a buffer of size BLCKSZ.
296  */
297 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
298 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
299 
300 /*
301  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303  * worst-case error of around 18%. That's effective enough to choose a
304  * reasonable number of partitions when recursing.
305  */
306 #define HASHAGG_HLL_BIT_WIDTH 5
307 
308 /*
309  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
310  * improved?
311  */
312 #define CHUNKHDRSZ 16
313 
314 /*
315  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
316  * number of tapes needed at the start of the algorithm (because it can
317  * recurse), so one tape set is allocated and extended as needed for new
318  * tapes. When a particular tape is already read, rewind it for write mode and
319  * put it in the free list.
320  *
321  * Tapes' buffers can take up substantial memory when many tapes are open at
322  * once. We only need one tape open at a time in read mode (using a buffer
323  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
324  * requiring a buffer of size BLCKSZ) for each partition.
325  */
326 typedef struct HashTapeInfo
327 {
329  int ntapes;
330  int *freetapes;
333 } HashTapeInfo;
334 
335 /*
336  * Represents partitioned spill data for a single hashtable. Contains the
337  * necessary information to route tuples to the correct partition, and to
338  * transform the spilled data into new batches.
339  *
340  * The high bits are used for partition selection (when recursing, we ignore
341  * the bits that have already been used for partition selection at an earlier
342  * level).
343  */
344 typedef struct HashAggSpill
345 {
346  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
347  int npartitions; /* number of partitions */
348  int *partitions; /* spill partition tape numbers */
349  int64 *ntuples; /* number of tuples in each partition */
350  uint32 mask; /* mask to find partition from hash value */
351  int shift; /* after masking, shift by this amount */
352  hyperLogLogState *hll_card; /* cardinality estimate for contents */
353 } HashAggSpill;
354 
355 /*
356  * Represents work to be done for one pass of hash aggregation (with only one
357  * grouping set).
358  *
359  * Also tracks the bits of the hash already used for partition selection by
360  * earlier iterations, so that this batch can use new bits. If all bits have
361  * already been used, no partitioning will be done (any spilled data will go
362  * to a single output tape).
363  */
364 typedef struct HashAggBatch
365 {
366  int setno; /* grouping set */
367  int used_bits; /* number of bits of hash already used */
368  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
369  int input_tapenum; /* input partition tape */
370  int64 input_tuples; /* number of tuples in this batch */
371  double input_card; /* estimated group cardinality */
372 } HashAggBatch;
373 
374 /* used to find referenced colnos */
375 typedef struct FindColsContext
376 {
377  bool is_aggref; /* is under an aggref */
378  Bitmapset *aggregated; /* column references under an aggref */
379  Bitmapset *unaggregated; /* other column references */
381 
382 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
383 static void initialize_phase(AggState *aggstate, int newphase);
384 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
385 static void initialize_aggregates(AggState *aggstate,
386  AggStatePerGroup *pergroups,
387  int numReset);
388 static void advance_transition_function(AggState *aggstate,
389  AggStatePerTrans pertrans,
390  AggStatePerGroup pergroupstate);
391 static void advance_aggregates(AggState *aggstate);
392 static void process_ordered_aggregate_single(AggState *aggstate,
393  AggStatePerTrans pertrans,
394  AggStatePerGroup pergroupstate);
395 static void process_ordered_aggregate_multi(AggState *aggstate,
396  AggStatePerTrans pertrans,
397  AggStatePerGroup pergroupstate);
398 static void finalize_aggregate(AggState *aggstate,
399  AggStatePerAgg peragg,
400  AggStatePerGroup pergroupstate,
401  Datum *resultVal, bool *resultIsNull);
402 static void finalize_partialaggregate(AggState *aggstate,
403  AggStatePerAgg peragg,
404  AggStatePerGroup pergroupstate,
405  Datum *resultVal, bool *resultIsNull);
406 static inline void prepare_hash_slot(AggStatePerHash perhash,
407  TupleTableSlot *inputslot,
408  TupleTableSlot *hashslot);
409 static void prepare_projection_slot(AggState *aggstate,
410  TupleTableSlot *slot,
411  int currentSet);
412 static void finalize_aggregates(AggState *aggstate,
413  AggStatePerAgg peragg,
414  AggStatePerGroup pergroup);
415 static TupleTableSlot *project_aggregates(AggState *aggstate);
416 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
417  Bitmapset **unaggregated);
418 static bool find_cols_walker(Node *node, FindColsContext *context);
419 static void build_hash_tables(AggState *aggstate);
420 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
421 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
422  bool nullcheck);
423 static long hash_choose_num_buckets(double hashentrysize,
424  long estimated_nbuckets,
425  Size memory);
426 static int hash_choose_num_partitions(double input_groups,
427  double hashentrysize,
428  int used_bits,
429  int *log2_npartittions);
430 static void initialize_hash_entry(AggState *aggstate,
431  TupleHashTable hashtable,
432  TupleHashEntry entry);
433 static void lookup_hash_entries(AggState *aggstate);
434 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
435 static void agg_fill_hash_table(AggState *aggstate);
436 static bool agg_refill_hash_table(AggState *aggstate);
439 static void hash_agg_check_limits(AggState *aggstate);
440 static void hash_agg_enter_spill_mode(AggState *aggstate);
441 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
442  int npartitions);
443 static void hashagg_finish_initial_spills(AggState *aggstate);
444 static void hashagg_reset_spill_state(AggState *aggstate);
446  int input_tapenum, int setno,
447  int64 input_tuples, double input_card,
448  int used_bits);
449 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
450 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
451  int used_bits, double input_groups,
452  double hashentrysize);
453 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
454  TupleTableSlot *slot, uint32 hash);
455 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
456  int setno);
457 static void hashagg_tapeinfo_init(AggState *aggstate);
458 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
459  int ndest);
460 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
461 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
462 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
463  AggState *aggstate, EState *estate,
464  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
465  Oid aggserialfn, Oid aggdeserialfn,
466  Datum initValue, bool initValueIsNull,
467  Oid *inputTypes, int numArguments);
468 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
469  int lastaggno, List **same_input_transnos);
470 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
471  bool shareable,
472  Oid aggtransfn, Oid aggtranstype,
473  Oid aggserialfn, Oid aggdeserialfn,
474  Datum initValue, bool initValueIsNull,
475  List *transnos);
476 
477 
478 /*
479  * Select the current grouping set; affects current_set and
480  * curaggcontext.
481  */
482 static void
483 select_current_set(AggState *aggstate, int setno, bool is_hash)
484 {
485  /*
486  * When changing this, also adapt ExecAggPlainTransByVal() and
487  * ExecAggPlainTransByRef().
488  */
489  if (is_hash)
490  aggstate->curaggcontext = aggstate->hashcontext;
491  else
492  aggstate->curaggcontext = aggstate->aggcontexts[setno];
493 
494  aggstate->current_set = setno;
495 }
496 
497 /*
498  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
499  * current_phase + 1. Juggle the tuplesorts accordingly.
500  *
501  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
502  * case, so when entering phase 0, all we need to do is drop open sorts.
503  */
504 static void
505 initialize_phase(AggState *aggstate, int newphase)
506 {
507  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
508 
509  /*
510  * Whatever the previous state, we're now done with whatever input
511  * tuplesort was in use.
512  */
513  if (aggstate->sort_in)
514  {
515  tuplesort_end(aggstate->sort_in);
516  aggstate->sort_in = NULL;
517  }
518 
519  if (newphase <= 1)
520  {
521  /*
522  * Discard any existing output tuplesort.
523  */
524  if (aggstate->sort_out)
525  {
526  tuplesort_end(aggstate->sort_out);
527  aggstate->sort_out = NULL;
528  }
529  }
530  else
531  {
532  /*
533  * The old output tuplesort becomes the new input one, and this is the
534  * right time to actually sort it.
535  */
536  aggstate->sort_in = aggstate->sort_out;
537  aggstate->sort_out = NULL;
538  Assert(aggstate->sort_in);
539  tuplesort_performsort(aggstate->sort_in);
540  }
541 
542  /*
543  * If this isn't the last phase, we need to sort appropriately for the
544  * next phase in sequence.
545  */
546  if (newphase > 0 && newphase < aggstate->numphases - 1)
547  {
548  Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
549  PlanState *outerNode = outerPlanState(aggstate);
550  TupleDesc tupDesc = ExecGetResultType(outerNode);
551 
552  aggstate->sort_out = tuplesort_begin_heap(tupDesc,
553  sortnode->numCols,
554  sortnode->sortColIdx,
555  sortnode->sortOperators,
556  sortnode->collations,
557  sortnode->nullsFirst,
558  work_mem,
559  NULL, false);
560  }
561 
562  aggstate->current_phase = newphase;
563  aggstate->phase = &aggstate->phases[newphase];
564 }
565 
566 /*
567  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
568  * populated by the previous phase. Copy it to the sorter for the next phase
569  * if any.
570  *
571  * Callers cannot rely on memory for tuple in returned slot remaining valid
572  * past any subsequently fetched tuple.
573  */
574 static TupleTableSlot *
576 {
577  TupleTableSlot *slot;
578 
579  if (aggstate->sort_in)
580  {
581  /* make sure we check for interrupts in either path through here */
583  if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
584  aggstate->sort_slot, NULL))
585  return NULL;
586  slot = aggstate->sort_slot;
587  }
588  else
589  slot = ExecProcNode(outerPlanState(aggstate));
590 
591  if (!TupIsNull(slot) && aggstate->sort_out)
592  tuplesort_puttupleslot(aggstate->sort_out, slot);
593 
594  return slot;
595 }
596 
597 /*
598  * (Re)Initialize an individual aggregate.
599  *
600  * This function handles only one grouping set, already set in
601  * aggstate->current_set.
602  *
603  * When called, CurrentMemoryContext should be the per-query context.
604  */
605 static void
607  AggStatePerGroup pergroupstate)
608 {
609  /*
610  * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
611  */
612  if (pertrans->numSortCols > 0)
613  {
614  /*
615  * In case of rescan, maybe there could be an uncompleted sort
616  * operation? Clean it up if so.
617  */
618  if (pertrans->sortstates[aggstate->current_set])
619  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
620 
621 
622  /*
623  * We use a plain Datum sorter when there's a single input column;
624  * otherwise sort the full tuple. (See comments for
625  * process_ordered_aggregate_single.)
626  */
627  if (pertrans->numInputs == 1)
628  {
629  Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
630 
631  pertrans->sortstates[aggstate->current_set] =
632  tuplesort_begin_datum(attr->atttypid,
633  pertrans->sortOperators[0],
634  pertrans->sortCollations[0],
635  pertrans->sortNullsFirst[0],
636  work_mem, NULL, false);
637  }
638  else
639  pertrans->sortstates[aggstate->current_set] =
640  tuplesort_begin_heap(pertrans->sortdesc,
641  pertrans->numSortCols,
642  pertrans->sortColIdx,
643  pertrans->sortOperators,
644  pertrans->sortCollations,
645  pertrans->sortNullsFirst,
646  work_mem, NULL, false);
647  }
648 
649  /*
650  * (Re)set transValue to the initial value.
651  *
652  * Note that when the initial value is pass-by-ref, we must copy it (into
653  * the aggcontext) since we will pfree the transValue later.
654  */
655  if (pertrans->initValueIsNull)
656  pergroupstate->transValue = pertrans->initValue;
657  else
658  {
659  MemoryContext oldContext;
660 
662  pergroupstate->transValue = datumCopy(pertrans->initValue,
663  pertrans->transtypeByVal,
664  pertrans->transtypeLen);
665  MemoryContextSwitchTo(oldContext);
666  }
667  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
668 
669  /*
670  * If the initial value for the transition state doesn't exist in the
671  * pg_aggregate table then we will let the first non-NULL value returned
672  * from the outer procNode become the initial value. (This is useful for
673  * aggregates like max() and min().) The noTransValue flag signals that we
674  * still need to do this.
675  */
676  pergroupstate->noTransValue = pertrans->initValueIsNull;
677 }
678 
679 /*
680  * Initialize all aggregate transition states for a new group of input values.
681  *
682  * If there are multiple grouping sets, we initialize only the first numReset
683  * of them (the grouping sets are ordered so that the most specific one, which
684  * is reset most often, is first). As a convenience, if numReset is 0, we
685  * reinitialize all sets.
686  *
687  * NB: This cannot be used for hash aggregates, as for those the grouping set
688  * number has to be specified from further up.
689  *
690  * When called, CurrentMemoryContext should be the per-query context.
691  */
692 static void
694  AggStatePerGroup *pergroups,
695  int numReset)
696 {
697  int transno;
698  int numGroupingSets = Max(aggstate->phase->numsets, 1);
699  int setno = 0;
700  int numTrans = aggstate->numtrans;
701  AggStatePerTrans transstates = aggstate->pertrans;
702 
703  if (numReset == 0)
704  numReset = numGroupingSets;
705 
706  for (setno = 0; setno < numReset; setno++)
707  {
708  AggStatePerGroup pergroup = pergroups[setno];
709 
710  select_current_set(aggstate, setno, false);
711 
712  for (transno = 0; transno < numTrans; transno++)
713  {
714  AggStatePerTrans pertrans = &transstates[transno];
715  AggStatePerGroup pergroupstate = &pergroup[transno];
716 
717  initialize_aggregate(aggstate, pertrans, pergroupstate);
718  }
719  }
720 }
721 
722 /*
723  * Given new input value(s), advance the transition function of one aggregate
724  * state within one grouping set only (already set in aggstate->current_set)
725  *
726  * The new values (and null flags) have been preloaded into argument positions
727  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
728  * pass to the transition function. We also expect that the static fields of
729  * the fcinfo are already initialized; that was done by ExecInitAgg().
730  *
731  * It doesn't matter which memory context this is called in.
732  */
733 static void
735  AggStatePerTrans pertrans,
736  AggStatePerGroup pergroupstate)
737 {
738  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
739  MemoryContext oldContext;
740  Datum newVal;
741 
742  if (pertrans->transfn.fn_strict)
743  {
744  /*
745  * For a strict transfn, nothing happens when there's a NULL input; we
746  * just keep the prior transValue.
747  */
748  int numTransInputs = pertrans->numTransInputs;
749  int i;
750 
751  for (i = 1; i <= numTransInputs; i++)
752  {
753  if (fcinfo->args[i].isnull)
754  return;
755  }
756  if (pergroupstate->noTransValue)
757  {
758  /*
759  * transValue has not been initialized. This is the first non-NULL
760  * input value. We use it as the initial value for transValue. (We
761  * already checked that the agg's input type is binary-compatible
762  * with its transtype, so straight copy here is OK.)
763  *
764  * We must copy the datum into aggcontext if it is pass-by-ref. We
765  * do not need to pfree the old transValue, since it's NULL.
766  */
768  pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
769  pertrans->transtypeByVal,
770  pertrans->transtypeLen);
771  pergroupstate->transValueIsNull = false;
772  pergroupstate->noTransValue = false;
773  MemoryContextSwitchTo(oldContext);
774  return;
775  }
776  if (pergroupstate->transValueIsNull)
777  {
778  /*
779  * Don't call a strict function with NULL inputs. Note it is
780  * possible to get here despite the above tests, if the transfn is
781  * strict *and* returned a NULL on a prior cycle. If that happens
782  * we will propagate the NULL all the way to the end.
783  */
784  return;
785  }
786  }
787 
788  /* We run the transition functions in per-input-tuple memory context */
789  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
790 
791  /* set up aggstate->curpertrans for AggGetAggref() */
792  aggstate->curpertrans = pertrans;
793 
794  /*
795  * OK to call the transition function
796  */
797  fcinfo->args[0].value = pergroupstate->transValue;
798  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
799  fcinfo->isnull = false; /* just in case transfn doesn't set it */
800 
801  newVal = FunctionCallInvoke(fcinfo);
802 
803  aggstate->curpertrans = NULL;
804 
805  /*
806  * If pass-by-ref datatype, must copy the new value into aggcontext and
807  * free the prior transValue. But if transfn returned a pointer to its
808  * first input, we don't need to do anything. Also, if transfn returned a
809  * pointer to a R/W expanded object that is already a child of the
810  * aggcontext, assume we can adopt that value without copying it.
811  *
812  * It's safe to compare newVal with pergroup->transValue without regard
813  * for either being NULL, because ExecAggTransReparent() takes care to set
814  * transValue to 0 when NULL. Otherwise we could end up accidentally not
815  * reparenting, when the transValue has the same numerical value as
816  * newValue, despite being NULL. This is a somewhat hot path, making it
817  * undesirable to instead solve this with another branch for the common
818  * case of the transition function returning its (modified) input
819  * argument.
820  */
821  if (!pertrans->transtypeByVal &&
822  DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
823  newVal = ExecAggTransReparent(aggstate, pertrans,
824  newVal, fcinfo->isnull,
825  pergroupstate->transValue,
826  pergroupstate->transValueIsNull);
827 
828  pergroupstate->transValue = newVal;
829  pergroupstate->transValueIsNull = fcinfo->isnull;
830 
831  MemoryContextSwitchTo(oldContext);
832 }
833 
834 /*
835  * Advance each aggregate transition state for one input tuple. The input
836  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
837  * accessible to ExecEvalExpr.
838  *
839  * We have two sets of transition states to handle: one for sorted aggregation
840  * and one for hashed; we do them both here, to avoid multiple evaluation of
841  * the inputs.
842  *
843  * When called, CurrentMemoryContext should be the per-query context.
844  */
845 static void
847 {
848  bool dummynull;
849 
851  aggstate->tmpcontext,
852  &dummynull);
853 }
854 
855 /*
856  * Run the transition function for a DISTINCT or ORDER BY aggregate
857  * with only one input. This is called after we have completed
858  * entering all the input values into the sort object. We complete the
859  * sort, read out the values in sorted order, and run the transition
860  * function on each value (applying DISTINCT if appropriate).
861  *
862  * Note that the strictness of the transition function was checked when
863  * entering the values into the sort, so we don't check it again here;
864  * we just apply standard SQL DISTINCT logic.
865  *
866  * The one-input case is handled separately from the multi-input case
867  * for performance reasons: for single by-value inputs, such as the
868  * common case of count(distinct id), the tuplesort_getdatum code path
869  * is around 300% faster. (The speedup for by-reference types is less
870  * but still noticeable.)
871  *
872  * This function handles only one grouping set (already set in
873  * aggstate->current_set).
874  *
875  * When called, CurrentMemoryContext should be the per-query context.
876  */
877 static void
879  AggStatePerTrans pertrans,
880  AggStatePerGroup pergroupstate)
881 {
882  Datum oldVal = (Datum) 0;
883  bool oldIsNull = true;
884  bool haveOldVal = false;
885  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
886  MemoryContext oldContext;
887  bool isDistinct = (pertrans->numDistinctCols > 0);
888  Datum newAbbrevVal = (Datum) 0;
889  Datum oldAbbrevVal = (Datum) 0;
890  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
891  Datum *newVal;
892  bool *isNull;
893 
894  Assert(pertrans->numDistinctCols < 2);
895 
896  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
897 
898  /* Load the column into argument 1 (arg 0 will be transition value) */
899  newVal = &fcinfo->args[1].value;
900  isNull = &fcinfo->args[1].isnull;
901 
902  /*
903  * Note: if input type is pass-by-ref, the datums returned by the sort are
904  * freshly palloc'd in the per-query context, so we must be careful to
905  * pfree them when they are no longer needed.
906  */
907 
908  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
909  true, newVal, isNull, &newAbbrevVal))
910  {
911  /*
912  * Clear and select the working context for evaluation of the equality
913  * function and transition function.
914  */
915  MemoryContextReset(workcontext);
916  oldContext = MemoryContextSwitchTo(workcontext);
917 
918  /*
919  * If DISTINCT mode, and not distinct from prior, skip it.
920  */
921  if (isDistinct &&
922  haveOldVal &&
923  ((oldIsNull && *isNull) ||
924  (!oldIsNull && !*isNull &&
925  oldAbbrevVal == newAbbrevVal &&
927  pertrans->aggCollation,
928  oldVal, *newVal)))))
929  {
930  /* equal to prior, so forget this one */
931  if (!pertrans->inputtypeByVal && !*isNull)
932  pfree(DatumGetPointer(*newVal));
933  }
934  else
935  {
936  advance_transition_function(aggstate, pertrans, pergroupstate);
937  /* forget the old value, if any */
938  if (!oldIsNull && !pertrans->inputtypeByVal)
939  pfree(DatumGetPointer(oldVal));
940  /* and remember the new one for subsequent equality checks */
941  oldVal = *newVal;
942  oldAbbrevVal = newAbbrevVal;
943  oldIsNull = *isNull;
944  haveOldVal = true;
945  }
946 
947  MemoryContextSwitchTo(oldContext);
948  }
949 
950  if (!oldIsNull && !pertrans->inputtypeByVal)
951  pfree(DatumGetPointer(oldVal));
952 
953  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
954  pertrans->sortstates[aggstate->current_set] = NULL;
955 }
956 
957 /*
958  * Run the transition function for a DISTINCT or ORDER BY aggregate
959  * with more than one input. This is called after we have completed
960  * entering all the input values into the sort object. We complete the
961  * sort, read out the values in sorted order, and run the transition
962  * function on each value (applying DISTINCT if appropriate).
963  *
964  * This function handles only one grouping set (already set in
965  * aggstate->current_set).
966  *
967  * When called, CurrentMemoryContext should be the per-query context.
968  */
969 static void
971  AggStatePerTrans pertrans,
972  AggStatePerGroup pergroupstate)
973 {
974  ExprContext *tmpcontext = aggstate->tmpcontext;
975  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
976  TupleTableSlot *slot1 = pertrans->sortslot;
977  TupleTableSlot *slot2 = pertrans->uniqslot;
978  int numTransInputs = pertrans->numTransInputs;
979  int numDistinctCols = pertrans->numDistinctCols;
980  Datum newAbbrevVal = (Datum) 0;
981  Datum oldAbbrevVal = (Datum) 0;
982  bool haveOldValue = false;
983  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
984  int i;
985 
986  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
987 
988  ExecClearTuple(slot1);
989  if (slot2)
990  ExecClearTuple(slot2);
991 
992  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
993  true, true, slot1, &newAbbrevVal))
994  {
996 
997  tmpcontext->ecxt_outertuple = slot1;
998  tmpcontext->ecxt_innertuple = slot2;
999 
1000  if (numDistinctCols == 0 ||
1001  !haveOldValue ||
1002  newAbbrevVal != oldAbbrevVal ||
1003  !ExecQual(pertrans->equalfnMulti, tmpcontext))
1004  {
1005  /*
1006  * Extract the first numTransInputs columns as datums to pass to
1007  * the transfn.
1008  */
1009  slot_getsomeattrs(slot1, numTransInputs);
1010 
1011  /* Load values into fcinfo */
1012  /* Start from 1, since the 0th arg will be the transition value */
1013  for (i = 0; i < numTransInputs; i++)
1014  {
1015  fcinfo->args[i + 1].value = slot1->tts_values[i];
1016  fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1017  }
1018 
1019  advance_transition_function(aggstate, pertrans, pergroupstate);
1020 
1021  if (numDistinctCols > 0)
1022  {
1023  /* swap the slot pointers to retain the current tuple */
1024  TupleTableSlot *tmpslot = slot2;
1025 
1026  slot2 = slot1;
1027  slot1 = tmpslot;
1028  /* avoid ExecQual() calls by reusing abbreviated keys */
1029  oldAbbrevVal = newAbbrevVal;
1030  haveOldValue = true;
1031  }
1032  }
1033 
1034  /* Reset context each time */
1035  ResetExprContext(tmpcontext);
1036 
1037  ExecClearTuple(slot1);
1038  }
1039 
1040  if (slot2)
1041  ExecClearTuple(slot2);
1042 
1043  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1044  pertrans->sortstates[aggstate->current_set] = NULL;
1045 
1046  /* restore previous slot, potentially in use for grouping sets */
1047  tmpcontext->ecxt_outertuple = save;
1048 }
1049 
1050 /*
1051  * Compute the final value of one aggregate.
1052  *
1053  * This function handles only one grouping set (already set in
1054  * aggstate->current_set).
1055  *
1056  * The finalfn will be run, and the result delivered, in the
1057  * output-tuple context; caller's CurrentMemoryContext does not matter.
1058  *
1059  * The finalfn uses the state as set in the transno. This also might be
1060  * being used by another aggregate function, so it's important that we do
1061  * nothing destructive here.
1062  */
1063 static void
1065  AggStatePerAgg peragg,
1066  AggStatePerGroup pergroupstate,
1067  Datum *resultVal, bool *resultIsNull)
1068 {
1069  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1070  bool anynull = false;
1071  MemoryContext oldContext;
1072  int i;
1073  ListCell *lc;
1074  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1075 
1077 
1078  /*
1079  * Evaluate any direct arguments. We do this even if there's no finalfn
1080  * (which is unlikely anyway), so that side-effects happen as expected.
1081  * The direct arguments go into arg positions 1 and up, leaving position 0
1082  * for the transition state value.
1083  */
1084  i = 1;
1085  foreach(lc, peragg->aggdirectargs)
1086  {
1087  ExprState *expr = (ExprState *) lfirst(lc);
1088 
1089  fcinfo->args[i].value = ExecEvalExpr(expr,
1090  aggstate->ss.ps.ps_ExprContext,
1091  &fcinfo->args[i].isnull);
1092  anynull |= fcinfo->args[i].isnull;
1093  i++;
1094  }
1095 
1096  /*
1097  * Apply the agg's finalfn if one is provided, else return transValue.
1098  */
1099  if (OidIsValid(peragg->finalfn_oid))
1100  {
1101  int numFinalArgs = peragg->numFinalArgs;
1102 
1103  /* set up aggstate->curperagg for AggGetAggref() */
1104  aggstate->curperagg = peragg;
1105 
1106  InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1107  numFinalArgs,
1108  pertrans->aggCollation,
1109  (void *) aggstate, NULL);
1110 
1111  /* Fill in the transition state value */
1112  fcinfo->args[0].value =
1113  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1114  pergroupstate->transValueIsNull,
1115  pertrans->transtypeLen);
1116  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1117  anynull |= pergroupstate->transValueIsNull;
1118 
1119  /* Fill any remaining argument positions with nulls */
1120  for (; i < numFinalArgs; i++)
1121  {
1122  fcinfo->args[i].value = (Datum) 0;
1123  fcinfo->args[i].isnull = true;
1124  anynull = true;
1125  }
1126 
1127  if (fcinfo->flinfo->fn_strict && anynull)
1128  {
1129  /* don't call a strict function with NULL inputs */
1130  *resultVal = (Datum) 0;
1131  *resultIsNull = true;
1132  }
1133  else
1134  {
1135  *resultVal = FunctionCallInvoke(fcinfo);
1136  *resultIsNull = fcinfo->isnull;
1137  }
1138  aggstate->curperagg = NULL;
1139  }
1140  else
1141  {
1142  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1143  *resultVal = pergroupstate->transValue;
1144  *resultIsNull = pergroupstate->transValueIsNull;
1145  }
1146 
1147  /*
1148  * If result is pass-by-ref, make sure it is in the right context.
1149  */
1150  if (!peragg->resulttypeByVal && !*resultIsNull &&
1152  DatumGetPointer(*resultVal)))
1153  *resultVal = datumCopy(*resultVal,
1154  peragg->resulttypeByVal,
1155  peragg->resulttypeLen);
1156 
1157  MemoryContextSwitchTo(oldContext);
1158 }
1159 
1160 /*
1161  * Compute the output value of one partial aggregate.
1162  *
1163  * The serialization function will be run, and the result delivered, in the
1164  * output-tuple context; caller's CurrentMemoryContext does not matter.
1165  */
1166 static void
1168  AggStatePerAgg peragg,
1169  AggStatePerGroup pergroupstate,
1170  Datum *resultVal, bool *resultIsNull)
1171 {
1172  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1173  MemoryContext oldContext;
1174 
1176 
1177  /*
1178  * serialfn_oid will be set if we must serialize the transvalue before
1179  * returning it
1180  */
1181  if (OidIsValid(pertrans->serialfn_oid))
1182  {
1183  /* Don't call a strict serialization function with NULL input. */
1184  if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1185  {
1186  *resultVal = (Datum) 0;
1187  *resultIsNull = true;
1188  }
1189  else
1190  {
1191  FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1192 
1193  fcinfo->args[0].value =
1194  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1195  pergroupstate->transValueIsNull,
1196  pertrans->transtypeLen);
1197  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1198  fcinfo->isnull = false;
1199 
1200  *resultVal = FunctionCallInvoke(fcinfo);
1201  *resultIsNull = fcinfo->isnull;
1202  }
1203  }
1204  else
1205  {
1206  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1207  *resultVal = pergroupstate->transValue;
1208  *resultIsNull = pergroupstate->transValueIsNull;
1209  }
1210 
1211  /* If result is pass-by-ref, make sure it is in the right context. */
1212  if (!peragg->resulttypeByVal && !*resultIsNull &&
1214  DatumGetPointer(*resultVal)))
1215  *resultVal = datumCopy(*resultVal,
1216  peragg->resulttypeByVal,
1217  peragg->resulttypeLen);
1218 
1219  MemoryContextSwitchTo(oldContext);
1220 }
1221 
1222 /*
1223  * Extract the attributes that make up the grouping key into the
1224  * hashslot. This is necessary to compute the hash or perform a lookup.
1225  */
1226 static inline void
1228  TupleTableSlot *inputslot,
1229  TupleTableSlot *hashslot)
1230 {
1231  int i;
1232 
1233  /* transfer just the needed columns into hashslot */
1234  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1235  ExecClearTuple(hashslot);
1236 
1237  for (i = 0; i < perhash->numhashGrpCols; i++)
1238  {
1239  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1240 
1241  hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1242  hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1243  }
1244  ExecStoreVirtualTuple(hashslot);
1245 }
1246 
1247 /*
1248  * Prepare to finalize and project based on the specified representative tuple
1249  * slot and grouping set.
1250  *
1251  * In the specified tuple slot, force to null all attributes that should be
1252  * read as null in the context of the current grouping set. Also stash the
1253  * current group bitmap where GroupingExpr can get at it.
1254  *
1255  * This relies on three conditions:
1256  *
1257  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1258  * only reference it in evaluations, which will only access individual
1259  * attributes.
1260  *
1261  * 2) No system columns are going to need to be nulled. (If a system column is
1262  * referenced in a group clause, it is actually projected in the outer plan
1263  * tlist.)
1264  *
1265  * 3) Within a given phase, we never need to recover the value of an attribute
1266  * once it has been set to null.
1267  *
1268  * Poking into the slot this way is a bit ugly, but the consensus is that the
1269  * alternative was worse.
1270  */
1271 static void
1272 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1273 {
1274  if (aggstate->phase->grouped_cols)
1275  {
1276  Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1277 
1278  aggstate->grouped_cols = grouped_cols;
1279 
1280  if (TTS_EMPTY(slot))
1281  {
1282  /*
1283  * Force all values to be NULL if working on an empty input tuple
1284  * (i.e. an empty grouping set for which no input rows were
1285  * supplied).
1286  */
1287  ExecStoreAllNullTuple(slot);
1288  }
1289  else if (aggstate->all_grouped_cols)
1290  {
1291  ListCell *lc;
1292 
1293  /* all_grouped_cols is arranged in desc order */
1295 
1296  foreach(lc, aggstate->all_grouped_cols)
1297  {
1298  int attnum = lfirst_int(lc);
1299 
1300  if (!bms_is_member(attnum, grouped_cols))
1301  slot->tts_isnull[attnum - 1] = true;
1302  }
1303  }
1304  }
1305 }
1306 
1307 /*
1308  * Compute the final value of all aggregates for one group.
1309  *
1310  * This function handles only one grouping set at a time, which the caller must
1311  * have selected. It's also the caller's responsibility to adjust the supplied
1312  * pergroup parameter to point to the current set's transvalues.
1313  *
1314  * Results are stored in the output econtext aggvalues/aggnulls.
1315  */
1316 static void
1318  AggStatePerAgg peraggs,
1319  AggStatePerGroup pergroup)
1320 {
1321  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1322  Datum *aggvalues = econtext->ecxt_aggvalues;
1323  bool *aggnulls = econtext->ecxt_aggnulls;
1324  int aggno;
1325  int transno;
1326 
1327  /*
1328  * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1329  * inputs and run the transition functions.
1330  */
1331  for (transno = 0; transno < aggstate->numtrans; transno++)
1332  {
1333  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1334  AggStatePerGroup pergroupstate;
1335 
1336  pergroupstate = &pergroup[transno];
1337 
1338  if (pertrans->numSortCols > 0)
1339  {
1340  Assert(aggstate->aggstrategy != AGG_HASHED &&
1341  aggstate->aggstrategy != AGG_MIXED);
1342 
1343  if (pertrans->numInputs == 1)
1345  pertrans,
1346  pergroupstate);
1347  else
1349  pertrans,
1350  pergroupstate);
1351  }
1352  }
1353 
1354  /*
1355  * Run the final functions.
1356  */
1357  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1358  {
1359  AggStatePerAgg peragg = &peraggs[aggno];
1360  int transno = peragg->transno;
1361  AggStatePerGroup pergroupstate;
1362 
1363  pergroupstate = &pergroup[transno];
1364 
1365  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1366  finalize_partialaggregate(aggstate, peragg, pergroupstate,
1367  &aggvalues[aggno], &aggnulls[aggno]);
1368  else
1369  finalize_aggregate(aggstate, peragg, pergroupstate,
1370  &aggvalues[aggno], &aggnulls[aggno]);
1371  }
1372 }
1373 
1374 /*
1375  * Project the result of a group (whose aggs have already been calculated by
1376  * finalize_aggregates). Returns the result slot, or NULL if no row is
1377  * projected (suppressed by qual).
1378  */
1379 static TupleTableSlot *
1381 {
1382  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1383 
1384  /*
1385  * Check the qual (HAVING clause); if the group does not match, ignore it.
1386  */
1387  if (ExecQual(aggstate->ss.ps.qual, econtext))
1388  {
1389  /*
1390  * Form and return projection tuple using the aggregate results and
1391  * the representative input tuple.
1392  */
1393  return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1394  }
1395  else
1396  InstrCountFiltered1(aggstate, 1);
1397 
1398  return NULL;
1399 }
1400 
1401 /*
1402  * Walk tlist and qual to find referenced colnos, dividing them into
1403  * aggregated and unaggregated sets.
1404  */
1405 static void
1406 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1407 {
1408  Agg *agg = (Agg *) aggstate->ss.ps.plan;
1409  FindColsContext context;
1410 
1411  context.is_aggref = false;
1412  context.aggregated = NULL;
1413  context.unaggregated = NULL;
1414 
1415  (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1416  (void) find_cols_walker((Node *) agg->plan.qual, &context);
1417 
1418  *aggregated = context.aggregated;
1419  *unaggregated = context.unaggregated;
1420 }
1421 
1422 static bool
1424 {
1425  if (node == NULL)
1426  return false;
1427  if (IsA(node, Var))
1428  {
1429  Var *var = (Var *) node;
1430 
1431  /* setrefs.c should have set the varno to OUTER_VAR */
1432  Assert(var->varno == OUTER_VAR);
1433  Assert(var->varlevelsup == 0);
1434  if (context->is_aggref)
1435  context->aggregated = bms_add_member(context->aggregated,
1436  var->varattno);
1437  else
1438  context->unaggregated = bms_add_member(context->unaggregated,
1439  var->varattno);
1440  return false;
1441  }
1442  if (IsA(node, Aggref))
1443  {
1444  Assert(!context->is_aggref);
1445  context->is_aggref = true;
1446  expression_tree_walker(node, find_cols_walker, (void *) context);
1447  context->is_aggref = false;
1448  return false;
1449  }
1451  (void *) context);
1452 }
1453 
1454 /*
1455  * (Re-)initialize the hash table(s) to empty.
1456  *
1457  * To implement hashed aggregation, we need a hashtable that stores a
1458  * representative tuple and an array of AggStatePerGroup structs for each
1459  * distinct set of GROUP BY column values. We compute the hash key from the
1460  * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1461  * for each entry.
1462  *
1463  * We have a separate hashtable and associated perhash data structure for each
1464  * grouping set for which we're doing hashing.
1465  *
1466  * The contents of the hash tables always live in the hashcontext's per-tuple
1467  * memory context (there is only one of these for all tables together, since
1468  * they are all reset at the same time).
1469  */
1470 static void
1472 {
1473  int setno;
1474 
1475  for (setno = 0; setno < aggstate->num_hashes; ++setno)
1476  {
1477  AggStatePerHash perhash = &aggstate->perhash[setno];
1478  long nbuckets;
1479  Size memory;
1480 
1481  if (perhash->hashtable != NULL)
1482  {
1483  ResetTupleHashTable(perhash->hashtable);
1484  continue;
1485  }
1486 
1487  Assert(perhash->aggnode->numGroups > 0);
1488 
1489  memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1490 
1491  /* choose reasonable number of buckets per hashtable */
1492  nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1493  perhash->aggnode->numGroups,
1494  memory);
1495 
1496  build_hash_table(aggstate, setno, nbuckets);
1497  }
1498 
1499  aggstate->hash_ngroups_current = 0;
1500 }
1501 
1502 /*
1503  * Build a single hashtable for this grouping set.
1504  */
1505 static void
1506 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1507 {
1508  AggStatePerHash perhash = &aggstate->perhash[setno];
1509  MemoryContext metacxt = aggstate->hash_metacxt;
1510  MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1511  MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1512  Size additionalsize;
1513 
1514  Assert(aggstate->aggstrategy == AGG_HASHED ||
1515  aggstate->aggstrategy == AGG_MIXED);
1516 
1517  /*
1518  * Used to make sure initial hash table allocation does not exceed
1519  * hash_mem. Note that the estimate does not include space for
1520  * pass-by-reference transition data values, nor for the representative
1521  * tuple of each group.
1522  */
1523  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1524 
1525  perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1526  perhash->hashslot->tts_tupleDescriptor,
1527  perhash->numCols,
1528  perhash->hashGrpColIdxHash,
1529  perhash->eqfuncoids,
1530  perhash->hashfunctions,
1531  perhash->aggnode->grpCollations,
1532  nbuckets,
1533  additionalsize,
1534  metacxt,
1535  hashcxt,
1536  tmpcxt,
1537  DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1538 }
1539 
1540 /*
1541  * Compute columns that actually need to be stored in hashtable entries. The
1542  * incoming tuples from the child plan node will contain grouping columns,
1543  * other columns referenced in our targetlist and qual, columns used to
1544  * compute the aggregate functions, and perhaps just junk columns we don't use
1545  * at all. Only columns of the first two types need to be stored in the
1546  * hashtable, and getting rid of the others can make the table entries
1547  * significantly smaller. The hashtable only contains the relevant columns,
1548  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1549  * into the format of the normal input descriptor.
1550  *
1551  * Additional columns, in addition to the columns grouped by, come from two
1552  * sources: Firstly functionally dependent columns that we don't need to group
1553  * by themselves, and secondly ctids for row-marks.
1554  *
1555  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1556  * then build an array of the columns included in the hashtable. We might
1557  * still have duplicates if the passed-in grpColIdx has them, which can happen
1558  * in edge cases from semijoins/distinct; these can't always be removed,
1559  * because it's not certain that the duplicate cols will be using the same
1560  * hash function.
1561  *
1562  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1563  * the per-query context (unlike the hash table itself).
1564  */
1565 static void
1567 {
1568  Bitmapset *base_colnos;
1569  Bitmapset *aggregated_colnos;
1570  TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1571  List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1572  int numHashes = aggstate->num_hashes;
1573  EState *estate = aggstate->ss.ps.state;
1574  int j;
1575 
1576  /* Find Vars that will be needed in tlist and qual */
1577  find_cols(aggstate, &aggregated_colnos, &base_colnos);
1578  aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1579  aggstate->max_colno_needed = 0;
1580  aggstate->all_cols_needed = true;
1581 
1582  for (int i = 0; i < scanDesc->natts; i++)
1583  {
1584  int colno = i + 1;
1585  if (bms_is_member(colno, aggstate->colnos_needed))
1586  aggstate->max_colno_needed = colno;
1587  else
1588  aggstate->all_cols_needed = false;
1589  }
1590 
1591  for (j = 0; j < numHashes; ++j)
1592  {
1593  AggStatePerHash perhash = &aggstate->perhash[j];
1594  Bitmapset *colnos = bms_copy(base_colnos);
1595  AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1596  List *hashTlist = NIL;
1597  TupleDesc hashDesc;
1598  int maxCols;
1599  int i;
1600 
1601  perhash->largestGrpColIdx = 0;
1602 
1603  /*
1604  * If we're doing grouping sets, then some Vars might be referenced in
1605  * tlist/qual for the benefit of other grouping sets, but not needed
1606  * when hashing; i.e. prepare_projection_slot will null them out, so
1607  * there'd be no point storing them. Use prepare_projection_slot's
1608  * logic to determine which.
1609  */
1610  if (aggstate->phases[0].grouped_cols)
1611  {
1612  Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1613  ListCell *lc;
1614 
1615  foreach(lc, aggstate->all_grouped_cols)
1616  {
1617  int attnum = lfirst_int(lc);
1618 
1619  if (!bms_is_member(attnum, grouped_cols))
1620  colnos = bms_del_member(colnos, attnum);
1621  }
1622  }
1623 
1624  /*
1625  * Compute maximum number of input columns accounting for possible
1626  * duplications in the grpColIdx array, which can happen in some edge
1627  * cases where HashAggregate was generated as part of a semijoin or a
1628  * DISTINCT.
1629  */
1630  maxCols = bms_num_members(colnos) + perhash->numCols;
1631 
1632  perhash->hashGrpColIdxInput =
1633  palloc(maxCols * sizeof(AttrNumber));
1634  perhash->hashGrpColIdxHash =
1635  palloc(perhash->numCols * sizeof(AttrNumber));
1636 
1637  /* Add all the grouping columns to colnos */
1638  for (i = 0; i < perhash->numCols; i++)
1639  colnos = bms_add_member(colnos, grpColIdx[i]);
1640 
1641  /*
1642  * First build mapping for columns directly hashed. These are the
1643  * first, because they'll be accessed when computing hash values and
1644  * comparing tuples for exact matches. We also build simple mapping
1645  * for execGrouping, so it knows where to find the to-be-hashed /
1646  * compared columns in the input.
1647  */
1648  for (i = 0; i < perhash->numCols; i++)
1649  {
1650  perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1651  perhash->hashGrpColIdxHash[i] = i + 1;
1652  perhash->numhashGrpCols++;
1653  /* delete already mapped columns */
1654  bms_del_member(colnos, grpColIdx[i]);
1655  }
1656 
1657  /* and add the remaining columns */
1658  while ((i = bms_first_member(colnos)) >= 0)
1659  {
1660  perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1661  perhash->numhashGrpCols++;
1662  }
1663 
1664  /* and build a tuple descriptor for the hashtable */
1665  for (i = 0; i < perhash->numhashGrpCols; i++)
1666  {
1667  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1668 
1669  hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1670  perhash->largestGrpColIdx =
1671  Max(varNumber + 1, perhash->largestGrpColIdx);
1672  }
1673 
1674  hashDesc = ExecTypeFromTL(hashTlist);
1675 
1676  execTuplesHashPrepare(perhash->numCols,
1677  perhash->aggnode->grpOperators,
1678  &perhash->eqfuncoids,
1679  &perhash->hashfunctions);
1680  perhash->hashslot =
1681  ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1683 
1684  list_free(hashTlist);
1685  bms_free(colnos);
1686  }
1687 
1688  bms_free(base_colnos);
1689 }
1690 
1691 /*
1692  * Estimate per-hash-table-entry overhead.
1693  */
1694 Size
1695 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1696 {
1697  Size tupleChunkSize;
1698  Size pergroupChunkSize;
1699  Size transitionChunkSize;
1700  Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1701  tupleWidth);
1702  Size pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1703 
1704  tupleChunkSize = CHUNKHDRSZ + tupleSize;
1705 
1706  if (pergroupSize > 0)
1707  pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1708  else
1709  pergroupChunkSize = 0;
1710 
1711  if (transitionSpace > 0)
1712  transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1713  else
1714  transitionChunkSize = 0;
1715 
1716  return
1717  sizeof(TupleHashEntryData) +
1718  tupleChunkSize +
1719  pergroupChunkSize +
1720  transitionChunkSize;
1721 }
1722 
1723 /*
1724  * hashagg_recompile_expressions()
1725  *
1726  * Identifies the right phase, compiles the right expression given the
1727  * arguments, and then sets phase->evalfunc to that expression.
1728  *
1729  * Different versions of the compiled expression are needed depending on
1730  * whether hash aggregation has spilled or not, and whether it's reading from
1731  * the outer plan or a tape. Before spilling to disk, the expression reads
1732  * from the outer plan and does not need to perform a NULL check. After
1733  * HashAgg begins to spill, new groups will not be created in the hash table,
1734  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1735  * pointer check to the expression. Then, when reading spilled data from a
1736  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1737  *
1738  * It would be wasteful to recompile every time, so cache the compiled
1739  * expressions in the AggStatePerPhase, and reuse when appropriate.
1740  */
1741 static void
1742 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1743 {
1744  AggStatePerPhase phase;
1745  int i = minslot ? 1 : 0;
1746  int j = nullcheck ? 1 : 0;
1747 
1748  Assert(aggstate->aggstrategy == AGG_HASHED ||
1749  aggstate->aggstrategy == AGG_MIXED);
1750 
1751  if (aggstate->aggstrategy == AGG_HASHED)
1752  phase = &aggstate->phases[0];
1753  else /* AGG_MIXED */
1754  phase = &aggstate->phases[1];
1755 
1756  if (phase->evaltrans_cache[i][j] == NULL)
1757  {
1758  const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1759  bool outerfixed = aggstate->ss.ps.outeropsfixed;
1760  bool dohash = true;
1761  bool dosort;
1762 
1763  dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
1764 
1765  /* temporarily change the outerops while compiling the expression */
1766  if (minslot)
1767  {
1768  aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1769  aggstate->ss.ps.outeropsfixed = true;
1770  }
1771 
1772  phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1773  dosort, dohash,
1774  nullcheck);
1775 
1776  /* change back */
1777  aggstate->ss.ps.outerops = outerops;
1778  aggstate->ss.ps.outeropsfixed = outerfixed;
1779  }
1780 
1781  phase->evaltrans = phase->evaltrans_cache[i][j];
1782 }
1783 
1784 /*
1785  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1786  * number of partitions we expect to create (if we do spill).
1787  *
1788  * There are two limits: a memory limit, and also an ngroups limit. The
1789  * ngroups limit becomes important when we expect transition values to grow
1790  * substantially larger than the initial value.
1791  */
1792 void
1793 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1794  Size *mem_limit, uint64 *ngroups_limit,
1795  int *num_partitions)
1796 {
1797  int npartitions;
1798  Size partition_mem;
1799  int hash_mem = get_hash_mem();
1800 
1801  /* if not expected to spill, use all of hash_mem */
1802  if (input_groups * hashentrysize < hash_mem * 1024L)
1803  {
1804  if (num_partitions != NULL)
1805  *num_partitions = 0;
1806  *mem_limit = hash_mem * 1024L;
1807  *ngroups_limit = *mem_limit / hashentrysize;
1808  return;
1809  }
1810 
1811  /*
1812  * Calculate expected memory requirements for spilling, which is the size
1813  * of the buffers needed for all the tapes that need to be open at once.
1814  * Then, subtract that from the memory available for holding hash tables.
1815  */
1816  npartitions = hash_choose_num_partitions(input_groups,
1817  hashentrysize,
1818  used_bits,
1819  NULL);
1820  if (num_partitions != NULL)
1821  *num_partitions = npartitions;
1822 
1823  partition_mem =
1825  HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1826 
1827  /*
1828  * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1829  * minimum number of partitions, so we aren't going to dramatically exceed
1830  * work mem anyway.
1831  */
1832  if (hash_mem * 1024L > 4 * partition_mem)
1833  *mem_limit = hash_mem * 1024L - partition_mem;
1834  else
1835  *mem_limit = hash_mem * 1024L * 0.75;
1836 
1837  if (*mem_limit > hashentrysize)
1838  *ngroups_limit = *mem_limit / hashentrysize;
1839  else
1840  *ngroups_limit = 1;
1841 }
1842 
1843 /*
1844  * hash_agg_check_limits
1845  *
1846  * After adding a new group to the hash table, check whether we need to enter
1847  * spill mode. Allocations may happen without adding new groups (for instance,
1848  * if the transition state size grows), so this check is imperfect.
1849  */
1850 static void
1852 {
1853  uint64 ngroups = aggstate->hash_ngroups_current;
1854  Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1855  true);
1857  true);
1858 
1859  /*
1860  * Don't spill unless there's at least one group in the hash table so we
1861  * can be sure to make progress even in edge cases.
1862  */
1863  if (aggstate->hash_ngroups_current > 0 &&
1864  (meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
1865  ngroups > aggstate->hash_ngroups_limit))
1866  {
1867  hash_agg_enter_spill_mode(aggstate);
1868  }
1869 }
1870 
1871 /*
1872  * Enter "spill mode", meaning that no new groups are added to any of the hash
1873  * tables. Tuples that would create a new group are instead spilled, and
1874  * processed later.
1875  */
1876 static void
1878 {
1879  aggstate->hash_spill_mode = true;
1880  hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1881 
1882  if (!aggstate->hash_ever_spilled)
1883  {
1884  Assert(aggstate->hash_tapeinfo == NULL);
1885  Assert(aggstate->hash_spills == NULL);
1886 
1887  aggstate->hash_ever_spilled = true;
1888 
1889  hashagg_tapeinfo_init(aggstate);
1890 
1891  aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1892 
1893  for (int setno = 0; setno < aggstate->num_hashes; setno++)
1894  {
1895  AggStatePerHash perhash = &aggstate->perhash[setno];
1896  HashAggSpill *spill = &aggstate->hash_spills[setno];
1897 
1898  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1899  perhash->aggnode->numGroups,
1900  aggstate->hashentrysize);
1901  }
1902  }
1903 }
1904 
1905 /*
1906  * Update metrics after filling the hash table.
1907  *
1908  * If reading from the outer plan, from_tape should be false; if reading from
1909  * another tape, from_tape should be true.
1910  */
1911 static void
1912 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1913 {
1914  Size meta_mem;
1915  Size hashkey_mem;
1916  Size buffer_mem;
1917  Size total_mem;
1918 
1919  if (aggstate->aggstrategy != AGG_MIXED &&
1920  aggstate->aggstrategy != AGG_HASHED)
1921  return;
1922 
1923  /* memory for the hash table itself */
1924  meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1925 
1926  /* memory for the group keys and transition states */
1927  hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1928 
1929  /* memory for read/write tape buffers, if spilled */
1930  buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1931  if (from_tape)
1932  buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1933 
1934  /* update peak mem */
1935  total_mem = meta_mem + hashkey_mem + buffer_mem;
1936  if (total_mem > aggstate->hash_mem_peak)
1937  aggstate->hash_mem_peak = total_mem;
1938 
1939  /* update disk usage */
1940  if (aggstate->hash_tapeinfo != NULL)
1941  {
1942  uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1943 
1944  if (aggstate->hash_disk_used < disk_used)
1945  aggstate->hash_disk_used = disk_used;
1946  }
1947 
1948  /* update hashentrysize estimate based on contents */
1949  if (aggstate->hash_ngroups_current > 0)
1950  {
1951  aggstate->hashentrysize =
1952  sizeof(TupleHashEntryData) +
1953  (hashkey_mem / (double) aggstate->hash_ngroups_current);
1954  }
1955 }
1956 
1957 /*
1958  * Choose a reasonable number of buckets for the initial hash table size.
1959  */
1960 static long
1961 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1962 {
1963  long max_nbuckets;
1964  long nbuckets = ngroups;
1965 
1966  max_nbuckets = memory / hashentrysize;
1967 
1968  /*
1969  * Underestimating is better than overestimating. Too many buckets crowd
1970  * out space for group keys and transition state values.
1971  */
1972  max_nbuckets >>= 1;
1973 
1974  if (nbuckets > max_nbuckets)
1975  nbuckets = max_nbuckets;
1976 
1977  return Max(nbuckets, 1);
1978 }
1979 
1980 /*
1981  * Determine the number of partitions to create when spilling, which will
1982  * always be a power of two. If log2_npartitions is non-NULL, set
1983  * *log2_npartitions to the log2() of the number of partitions.
1984  */
1985 static int
1986 hash_choose_num_partitions(double input_groups, double hashentrysize,
1987  int used_bits, int *log2_npartitions)
1988 {
1989  Size mem_wanted;
1990  int partition_limit;
1991  int npartitions;
1992  int partition_bits;
1993  int hash_mem = get_hash_mem();
1994 
1995  /*
1996  * Avoid creating so many partitions that the memory requirements of the
1997  * open partition files are greater than 1/4 of hash_mem.
1998  */
1999  partition_limit =
2000  (hash_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
2002 
2003  mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
2004 
2005  /* make enough partitions so that each one is likely to fit in memory */
2006  npartitions = 1 + (mem_wanted / (hash_mem * 1024L));
2007 
2008  if (npartitions > partition_limit)
2009  npartitions = partition_limit;
2010 
2011  if (npartitions < HASHAGG_MIN_PARTITIONS)
2012  npartitions = HASHAGG_MIN_PARTITIONS;
2013  if (npartitions > HASHAGG_MAX_PARTITIONS)
2014  npartitions = HASHAGG_MAX_PARTITIONS;
2015 
2016  /* ceil(log2(npartitions)) */
2017  partition_bits = my_log2(npartitions);
2018 
2019  /* make sure that we don't exhaust the hash bits */
2020  if (partition_bits + used_bits >= 32)
2021  partition_bits = 32 - used_bits;
2022 
2023  if (log2_npartitions != NULL)
2024  *log2_npartitions = partition_bits;
2025 
2026  /* number of partitions will be a power of two */
2027  npartitions = 1L << partition_bits;
2028 
2029  return npartitions;
2030 }
2031 
2032 /*
2033  * Initialize a freshly-created TupleHashEntry.
2034  */
2035 static void
2037  TupleHashEntry entry)
2038 {
2039  AggStatePerGroup pergroup;
2040  int transno;
2041 
2042  aggstate->hash_ngroups_current++;
2043  hash_agg_check_limits(aggstate);
2044 
2045  /* no need to allocate or initialize per-group state */
2046  if (aggstate->numtrans == 0)
2047  return;
2048 
2049  pergroup = (AggStatePerGroup)
2050  MemoryContextAlloc(hashtable->tablecxt,
2051  sizeof(AggStatePerGroupData) * aggstate->numtrans);
2052 
2053  entry->additional = pergroup;
2054 
2055  /*
2056  * Initialize aggregates for new tuple group, lookup_hash_entries()
2057  * already has selected the relevant grouping set.
2058  */
2059  for (transno = 0; transno < aggstate->numtrans; transno++)
2060  {
2061  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2062  AggStatePerGroup pergroupstate = &pergroup[transno];
2063 
2064  initialize_aggregate(aggstate, pertrans, pergroupstate);
2065  }
2066 }
2067 
2068 /*
2069  * Look up hash entries for the current tuple in all hashed grouping sets,
2070  * returning an array of pergroup pointers suitable for advance_aggregates.
2071  *
2072  * Be aware that lookup_hash_entry can reset the tmpcontext.
2073  *
2074  * Some entries may be left NULL if we are in "spill mode". The same tuple
2075  * will belong to different groups for each grouping set, so may match a group
2076  * already in memory for one set and match a group not in memory for another
2077  * set. When in "spill mode", the tuple will be spilled for each grouping set
2078  * where it doesn't match a group in memory.
2079  *
2080  * NB: It's possible to spill the same tuple for several different grouping
2081  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2082  * the tuple multiple times for multiple grouping sets, it can be partitioned
2083  * for each grouping set, making the refilling of the hash table very
2084  * efficient.
2085  */
2086 static void
2088 {
2089  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2090  TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2091  int setno;
2092 
2093  for (setno = 0; setno < aggstate->num_hashes; setno++)
2094  {
2095  AggStatePerHash perhash = &aggstate->perhash[setno];
2096  TupleHashTable hashtable = perhash->hashtable;
2097  TupleTableSlot *hashslot = perhash->hashslot;
2098  TupleHashEntry entry;
2099  uint32 hash;
2100  bool isnew = false;
2101  bool *p_isnew;
2102 
2103  /* if hash table already spilled, don't create new entries */
2104  p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2105 
2106  select_current_set(aggstate, setno, true);
2107  prepare_hash_slot(perhash,
2108  outerslot,
2109  hashslot);
2110 
2111  entry = LookupTupleHashEntry(hashtable, hashslot,
2112  p_isnew, &hash);
2113 
2114  if (entry != NULL)
2115  {
2116  if (isnew)
2117  initialize_hash_entry(aggstate, hashtable, entry);
2118  pergroup[setno] = entry->additional;
2119  }
2120  else
2121  {
2122  HashAggSpill *spill = &aggstate->hash_spills[setno];
2123  TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2124 
2125  if (spill->partitions == NULL)
2126  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2127  perhash->aggnode->numGroups,
2128  aggstate->hashentrysize);
2129 
2130  hashagg_spill_tuple(aggstate, spill, slot, hash);
2131  pergroup[setno] = NULL;
2132  }
2133  }
2134 }
2135 
2136 /*
2137  * ExecAgg -
2138  *
2139  * ExecAgg receives tuples from its outer subplan and aggregates over
2140  * the appropriate attribute for each aggregate function use (Aggref
2141  * node) appearing in the targetlist or qual of the node. The number
2142  * of tuples to aggregate over depends on whether grouped or plain
2143  * aggregation is selected. In grouped aggregation, we produce a result
2144  * row for each group; in plain aggregation there's a single result row
2145  * for the whole query. In either case, the value of each aggregate is
2146  * stored in the expression context to be used when ExecProject evaluates
2147  * the result tuple.
2148  */
2149 static TupleTableSlot *
2151 {
2152  AggState *node = castNode(AggState, pstate);
2153  TupleTableSlot *result = NULL;
2154 
2156 
2157  if (!node->agg_done)
2158  {
2159  /* Dispatch based on strategy */
2160  switch (node->phase->aggstrategy)
2161  {
2162  case AGG_HASHED:
2163  if (!node->table_filled)
2164  agg_fill_hash_table(node);
2165  /* FALLTHROUGH */
2166  case AGG_MIXED:
2167  result = agg_retrieve_hash_table(node);
2168  break;
2169  case AGG_PLAIN:
2170  case AGG_SORTED:
2171  result = agg_retrieve_direct(node);
2172  break;
2173  }
2174 
2175  if (!TupIsNull(result))
2176  return result;
2177  }
2178 
2179  return NULL;
2180 }
2181 
2182 /*
2183  * ExecAgg for non-hashed case
2184  */
2185 static TupleTableSlot *
2187 {
2188  Agg *node = aggstate->phase->aggnode;
2189  ExprContext *econtext;
2190  ExprContext *tmpcontext;
2191  AggStatePerAgg peragg;
2192  AggStatePerGroup *pergroups;
2193  TupleTableSlot *outerslot;
2194  TupleTableSlot *firstSlot;
2195  TupleTableSlot *result;
2196  bool hasGroupingSets = aggstate->phase->numsets > 0;
2197  int numGroupingSets = Max(aggstate->phase->numsets, 1);
2198  int currentSet;
2199  int nextSetSize;
2200  int numReset;
2201  int i;
2202 
2203  /*
2204  * get state info from node
2205  *
2206  * econtext is the per-output-tuple expression context
2207  *
2208  * tmpcontext is the per-input-tuple expression context
2209  */
2210  econtext = aggstate->ss.ps.ps_ExprContext;
2211  tmpcontext = aggstate->tmpcontext;
2212 
2213  peragg = aggstate->peragg;
2214  pergroups = aggstate->pergroups;
2215  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2216 
2217  /*
2218  * We loop retrieving groups until we find one matching
2219  * aggstate->ss.ps.qual
2220  *
2221  * For grouping sets, we have the invariant that aggstate->projected_set
2222  * is either -1 (initial call) or the index (starting from 0) in
2223  * gset_lengths for the group we just completed (either by projecting a
2224  * row or by discarding it in the qual).
2225  */
2226  while (!aggstate->agg_done)
2227  {
2228  /*
2229  * Clear the per-output-tuple context for each group, as well as
2230  * aggcontext (which contains any pass-by-ref transvalues of the old
2231  * group). Some aggregate functions store working state in child
2232  * contexts; those now get reset automatically without us needing to
2233  * do anything special.
2234  *
2235  * We use ReScanExprContext not just ResetExprContext because we want
2236  * any registered shutdown callbacks to be called. That allows
2237  * aggregate functions to ensure they've cleaned up any non-memory
2238  * resources.
2239  */
2240  ReScanExprContext(econtext);
2241 
2242  /*
2243  * Determine how many grouping sets need to be reset at this boundary.
2244  */
2245  if (aggstate->projected_set >= 0 &&
2246  aggstate->projected_set < numGroupingSets)
2247  numReset = aggstate->projected_set + 1;
2248  else
2249  numReset = numGroupingSets;
2250 
2251  /*
2252  * numReset can change on a phase boundary, but that's OK; we want to
2253  * reset the contexts used in _this_ phase, and later, after possibly
2254  * changing phase, initialize the right number of aggregates for the
2255  * _new_ phase.
2256  */
2257 
2258  for (i = 0; i < numReset; i++)
2259  {
2260  ReScanExprContext(aggstate->aggcontexts[i]);
2261  }
2262 
2263  /*
2264  * Check if input is complete and there are no more groups to project
2265  * in this phase; move to next phase or mark as done.
2266  */
2267  if (aggstate->input_done == true &&
2268  aggstate->projected_set >= (numGroupingSets - 1))
2269  {
2270  if (aggstate->current_phase < aggstate->numphases - 1)
2271  {
2272  initialize_phase(aggstate, aggstate->current_phase + 1);
2273  aggstate->input_done = false;
2274  aggstate->projected_set = -1;
2275  numGroupingSets = Max(aggstate->phase->numsets, 1);
2276  node = aggstate->phase->aggnode;
2277  numReset = numGroupingSets;
2278  }
2279  else if (aggstate->aggstrategy == AGG_MIXED)
2280  {
2281  /*
2282  * Mixed mode; we've output all the grouped stuff and have
2283  * full hashtables, so switch to outputting those.
2284  */
2285  initialize_phase(aggstate, 0);
2286  aggstate->table_filled = true;
2288  &aggstate->perhash[0].hashiter);
2289  select_current_set(aggstate, 0, true);
2290  return agg_retrieve_hash_table(aggstate);
2291  }
2292  else
2293  {
2294  aggstate->agg_done = true;
2295  break;
2296  }
2297  }
2298 
2299  /*
2300  * Get the number of columns in the next grouping set after the last
2301  * projected one (if any). This is the number of columns to compare to
2302  * see if we reached the boundary of that set too.
2303  */
2304  if (aggstate->projected_set >= 0 &&
2305  aggstate->projected_set < (numGroupingSets - 1))
2306  nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2307  else
2308  nextSetSize = 0;
2309 
2310  /*----------
2311  * If a subgroup for the current grouping set is present, project it.
2312  *
2313  * We have a new group if:
2314  * - we're out of input but haven't projected all grouping sets
2315  * (checked above)
2316  * OR
2317  * - we already projected a row that wasn't from the last grouping
2318  * set
2319  * AND
2320  * - the next grouping set has at least one grouping column (since
2321  * empty grouping sets project only once input is exhausted)
2322  * AND
2323  * - the previous and pending rows differ on the grouping columns
2324  * of the next grouping set
2325  *----------
2326  */
2327  tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2328  if (aggstate->input_done ||
2329  (node->aggstrategy != AGG_PLAIN &&
2330  aggstate->projected_set != -1 &&
2331  aggstate->projected_set < (numGroupingSets - 1) &&
2332  nextSetSize > 0 &&
2333  !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2334  tmpcontext)))
2335  {
2336  aggstate->projected_set += 1;
2337 
2338  Assert(aggstate->projected_set < numGroupingSets);
2339  Assert(nextSetSize > 0 || aggstate->input_done);
2340  }
2341  else
2342  {
2343  /*
2344  * We no longer care what group we just projected, the next
2345  * projection will always be the first (or only) grouping set
2346  * (unless the input proves to be empty).
2347  */
2348  aggstate->projected_set = 0;
2349 
2350  /*
2351  * If we don't already have the first tuple of the new group,
2352  * fetch it from the outer plan.
2353  */
2354  if (aggstate->grp_firstTuple == NULL)
2355  {
2356  outerslot = fetch_input_tuple(aggstate);
2357  if (!TupIsNull(outerslot))
2358  {
2359  /*
2360  * Make a copy of the first input tuple; we will use this
2361  * for comparisons (in group mode) and for projection.
2362  */
2363  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2364  }
2365  else
2366  {
2367  /* outer plan produced no tuples at all */
2368  if (hasGroupingSets)
2369  {
2370  /*
2371  * If there was no input at all, we need to project
2372  * rows only if there are grouping sets of size 0.
2373  * Note that this implies that there can't be any
2374  * references to ungrouped Vars, which would otherwise
2375  * cause issues with the empty output slot.
2376  *
2377  * XXX: This is no longer true, we currently deal with
2378  * this in finalize_aggregates().
2379  */
2380  aggstate->input_done = true;
2381 
2382  while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2383  {
2384  aggstate->projected_set += 1;
2385  if (aggstate->projected_set >= numGroupingSets)
2386  {
2387  /*
2388  * We can't set agg_done here because we might
2389  * have more phases to do, even though the
2390  * input is empty. So we need to restart the
2391  * whole outer loop.
2392  */
2393  break;
2394  }
2395  }
2396 
2397  if (aggstate->projected_set >= numGroupingSets)
2398  continue;
2399  }
2400  else
2401  {
2402  aggstate->agg_done = true;
2403  /* If we are grouping, we should produce no tuples too */
2404  if (node->aggstrategy != AGG_PLAIN)
2405  return NULL;
2406  }
2407  }
2408  }
2409 
2410  /*
2411  * Initialize working state for a new input tuple group.
2412  */
2413  initialize_aggregates(aggstate, pergroups, numReset);
2414 
2415  if (aggstate->grp_firstTuple != NULL)
2416  {
2417  /*
2418  * Store the copied first input tuple in the tuple table slot
2419  * reserved for it. The tuple will be deleted when it is
2420  * cleared from the slot.
2421  */
2423  firstSlot, true);
2424  aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2425 
2426  /* set up for first advance_aggregates call */
2427  tmpcontext->ecxt_outertuple = firstSlot;
2428 
2429  /*
2430  * Process each outer-plan tuple, and then fetch the next one,
2431  * until we exhaust the outer plan or cross a group boundary.
2432  */
2433  for (;;)
2434  {
2435  /*
2436  * During phase 1 only of a mixed agg, we need to update
2437  * hashtables as well in advance_aggregates.
2438  */
2439  if (aggstate->aggstrategy == AGG_MIXED &&
2440  aggstate->current_phase == 1)
2441  {
2442  lookup_hash_entries(aggstate);
2443  }
2444 
2445  /* Advance the aggregates (or combine functions) */
2446  advance_aggregates(aggstate);
2447 
2448  /* Reset per-input-tuple context after each tuple */
2449  ResetExprContext(tmpcontext);
2450 
2451  outerslot = fetch_input_tuple(aggstate);
2452  if (TupIsNull(outerslot))
2453  {
2454  /* no more outer-plan tuples available */
2455 
2456  /* if we built hash tables, finalize any spills */
2457  if (aggstate->aggstrategy == AGG_MIXED &&
2458  aggstate->current_phase == 1)
2460 
2461  if (hasGroupingSets)
2462  {
2463  aggstate->input_done = true;
2464  break;
2465  }
2466  else
2467  {
2468  aggstate->agg_done = true;
2469  break;
2470  }
2471  }
2472  /* set up for next advance_aggregates call */
2473  tmpcontext->ecxt_outertuple = outerslot;
2474 
2475  /*
2476  * If we are grouping, check whether we've crossed a group
2477  * boundary.
2478  */
2479  if (node->aggstrategy != AGG_PLAIN)
2480  {
2481  tmpcontext->ecxt_innertuple = firstSlot;
2482  if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2483  tmpcontext))
2484  {
2485  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2486  break;
2487  }
2488  }
2489  }
2490  }
2491 
2492  /*
2493  * Use the representative input tuple for any references to
2494  * non-aggregated input columns in aggregate direct args, the node
2495  * qual, and the tlist. (If we are not grouping, and there are no
2496  * input rows at all, we will come here with an empty firstSlot
2497  * ... but if not grouping, there can't be any references to
2498  * non-aggregated input columns, so no problem.)
2499  */
2500  econtext->ecxt_outertuple = firstSlot;
2501  }
2502 
2503  Assert(aggstate->projected_set >= 0);
2504 
2505  currentSet = aggstate->projected_set;
2506 
2507  prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2508 
2509  select_current_set(aggstate, currentSet, false);
2510 
2511  finalize_aggregates(aggstate,
2512  peragg,
2513  pergroups[currentSet]);
2514 
2515  /*
2516  * If there's no row to project right now, we must continue rather
2517  * than returning a null since there might be more groups.
2518  */
2519  result = project_aggregates(aggstate);
2520  if (result)
2521  return result;
2522  }
2523 
2524  /* No more groups */
2525  return NULL;
2526 }
2527 
2528 /*
2529  * ExecAgg for hashed case: read input and build hash table
2530  */
2531 static void
2533 {
2534  TupleTableSlot *outerslot;
2535  ExprContext *tmpcontext = aggstate->tmpcontext;
2536 
2537  /*
2538  * Process each outer-plan tuple, and then fetch the next one, until we
2539  * exhaust the outer plan.
2540  */
2541  for (;;)
2542  {
2543  outerslot = fetch_input_tuple(aggstate);
2544  if (TupIsNull(outerslot))
2545  break;
2546 
2547  /* set up for lookup_hash_entries and advance_aggregates */
2548  tmpcontext->ecxt_outertuple = outerslot;
2549 
2550  /* Find or build hashtable entries */
2551  lookup_hash_entries(aggstate);
2552 
2553  /* Advance the aggregates (or combine functions) */
2554  advance_aggregates(aggstate);
2555 
2556  /*
2557  * Reset per-input-tuple context after each tuple, but note that the
2558  * hash lookups do this too
2559  */
2560  ResetExprContext(aggstate->tmpcontext);
2561  }
2562 
2563  /* finalize spills, if any */
2565 
2566  aggstate->table_filled = true;
2567  /* Initialize to walk the first hash table */
2568  select_current_set(aggstate, 0, true);
2570  &aggstate->perhash[0].hashiter);
2571 }
2572 
2573 /*
2574  * If any data was spilled during hash aggregation, reset the hash table and
2575  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2576  * table will again contain data, ready to be consumed by
2577  * agg_retrieve_hash_table_in_memory().
2578  *
2579  * Should only be called after all in memory hash table entries have been
2580  * finalized and emitted.
2581  *
2582  * Return false when input is exhausted and there's no more work to be done;
2583  * otherwise return true.
2584  */
2585 static bool
2587 {
2588  HashAggBatch *batch;
2589  AggStatePerHash perhash;
2590  HashAggSpill spill;
2591  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2592  bool spill_initialized = false;
2593 
2594  if (aggstate->hash_batches == NIL)
2595  return false;
2596 
2597  batch = linitial(aggstate->hash_batches);
2598  aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
2599 
2600  hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2601  batch->used_bits, &aggstate->hash_mem_limit,
2602  &aggstate->hash_ngroups_limit, NULL);
2603 
2604  /* there could be residual pergroup pointers; clear them */
2605  for (int setoff = 0;
2606  setoff < aggstate->maxsets + aggstate->num_hashes;
2607  setoff++)
2608  aggstate->all_pergroups[setoff] = NULL;
2609 
2610  /* free memory and reset hash tables */
2611  ReScanExprContext(aggstate->hashcontext);
2612  for (int setno = 0; setno < aggstate->num_hashes; setno++)
2613  ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2614 
2615  aggstate->hash_ngroups_current = 0;
2616 
2617  /*
2618  * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2619  * happens in phase 0. So, we switch to phase 1 when processing a batch,
2620  * and back to phase 0 after the batch is done.
2621  */
2622  Assert(aggstate->current_phase == 0);
2623  if (aggstate->phase->aggstrategy == AGG_MIXED)
2624  {
2625  aggstate->current_phase = 1;
2626  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2627  }
2628 
2629  select_current_set(aggstate, batch->setno, true);
2630 
2631  perhash = &aggstate->perhash[aggstate->current_set];
2632 
2633  /*
2634  * Spilled tuples are always read back as MinimalTuples, which may be
2635  * different from the outer plan, so recompile the aggregate expressions.
2636  *
2637  * We still need the NULL check, because we are only processing one
2638  * grouping set at a time and the rest will be NULL.
2639  */
2640  hashagg_recompile_expressions(aggstate, true, true);
2641 
2642  for (;;)
2643  {
2644  TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2645  TupleTableSlot *hashslot = perhash->hashslot;
2646  TupleHashEntry entry;
2647  MinimalTuple tuple;
2648  uint32 hash;
2649  bool isnew = false;
2650  bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2651 
2653 
2654  tuple = hashagg_batch_read(batch, &hash);
2655  if (tuple == NULL)
2656  break;
2657 
2658  ExecStoreMinimalTuple(tuple, spillslot, true);
2659  aggstate->tmpcontext->ecxt_outertuple = spillslot;
2660 
2661  prepare_hash_slot(perhash,
2662  aggstate->tmpcontext->ecxt_outertuple,
2663  hashslot);
2664  entry = LookupTupleHashEntryHash(
2665  perhash->hashtable, hashslot, p_isnew, hash);
2666 
2667  if (entry != NULL)
2668  {
2669  if (isnew)
2670  initialize_hash_entry(aggstate, perhash->hashtable, entry);
2671  aggstate->hash_pergroup[batch->setno] = entry->additional;
2672  advance_aggregates(aggstate);
2673  }
2674  else
2675  {
2676  if (!spill_initialized)
2677  {
2678  /*
2679  * Avoid initializing the spill until we actually need it so
2680  * that we don't assign tapes that will never be used.
2681  */
2682  spill_initialized = true;
2683  hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2684  batch->input_card, aggstate->hashentrysize);
2685  }
2686  /* no memory for a new group, spill */
2687  hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
2688 
2689  aggstate->hash_pergroup[batch->setno] = NULL;
2690  }
2691 
2692  /*
2693  * Reset per-input-tuple context after each tuple, but note that the
2694  * hash lookups do this too
2695  */
2696  ResetExprContext(aggstate->tmpcontext);
2697  }
2698 
2699  hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2700 
2701  /* change back to phase 0 */
2702  aggstate->current_phase = 0;
2703  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2704 
2705  if (spill_initialized)
2706  {
2707  hashagg_spill_finish(aggstate, &spill, batch->setno);
2708  hash_agg_update_metrics(aggstate, true, spill.npartitions);
2709  }
2710  else
2711  hash_agg_update_metrics(aggstate, true, 0);
2712 
2713  aggstate->hash_spill_mode = false;
2714 
2715  /* prepare to walk the first hash table */
2716  select_current_set(aggstate, batch->setno, true);
2717  ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2718  &aggstate->perhash[batch->setno].hashiter);
2719 
2720  pfree(batch);
2721 
2722  return true;
2723 }
2724 
2725 /*
2726  * ExecAgg for hashed case: retrieving groups from hash table
2727  *
2728  * After exhausting in-memory tuples, also try refilling the hash table using
2729  * previously-spilled tuples. Only returns NULL after all in-memory and
2730  * spilled tuples are exhausted.
2731  */
2732 static TupleTableSlot *
2734 {
2735  TupleTableSlot *result = NULL;
2736 
2737  while (result == NULL)
2738  {
2739  result = agg_retrieve_hash_table_in_memory(aggstate);
2740  if (result == NULL)
2741  {
2742  if (!agg_refill_hash_table(aggstate))
2743  {
2744  aggstate->agg_done = true;
2745  break;
2746  }
2747  }
2748  }
2749 
2750  return result;
2751 }
2752 
2753 /*
2754  * Retrieve the groups from the in-memory hash tables without considering any
2755  * spilled tuples.
2756  */
2757 static TupleTableSlot *
2759 {
2760  ExprContext *econtext;
2761  AggStatePerAgg peragg;
2762  AggStatePerGroup pergroup;
2763  TupleHashEntryData *entry;
2764  TupleTableSlot *firstSlot;
2765  TupleTableSlot *result;
2766  AggStatePerHash perhash;
2767 
2768  /*
2769  * get state info from node.
2770  *
2771  * econtext is the per-output-tuple expression context.
2772  */
2773  econtext = aggstate->ss.ps.ps_ExprContext;
2774  peragg = aggstate->peragg;
2775  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2776 
2777  /*
2778  * Note that perhash (and therefore anything accessed through it) can
2779  * change inside the loop, as we change between grouping sets.
2780  */
2781  perhash = &aggstate->perhash[aggstate->current_set];
2782 
2783  /*
2784  * We loop retrieving groups until we find one satisfying
2785  * aggstate->ss.ps.qual
2786  */
2787  for (;;)
2788  {
2789  TupleTableSlot *hashslot = perhash->hashslot;
2790  int i;
2791 
2793 
2794  /*
2795  * Find the next entry in the hash table
2796  */
2797  entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2798  if (entry == NULL)
2799  {
2800  int nextset = aggstate->current_set + 1;
2801 
2802  if (nextset < aggstate->num_hashes)
2803  {
2804  /*
2805  * Switch to next grouping set, reinitialize, and restart the
2806  * loop.
2807  */
2808  select_current_set(aggstate, nextset, true);
2809 
2810  perhash = &aggstate->perhash[aggstate->current_set];
2811 
2812  ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2813 
2814  continue;
2815  }
2816  else
2817  {
2818  return NULL;
2819  }
2820  }
2821 
2822  /*
2823  * Clear the per-output-tuple context for each group
2824  *
2825  * We intentionally don't use ReScanExprContext here; if any aggs have
2826  * registered shutdown callbacks, they mustn't be called yet, since we
2827  * might not be done with that agg.
2828  */
2829  ResetExprContext(econtext);
2830 
2831  /*
2832  * Transform representative tuple back into one with the right
2833  * columns.
2834  */
2835  ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2836  slot_getallattrs(hashslot);
2837 
2838  ExecClearTuple(firstSlot);
2839  memset(firstSlot->tts_isnull, true,
2840  firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2841 
2842  for (i = 0; i < perhash->numhashGrpCols; i++)
2843  {
2844  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2845 
2846  firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2847  firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2848  }
2849  ExecStoreVirtualTuple(firstSlot);
2850 
2851  pergroup = (AggStatePerGroup) entry->additional;
2852 
2853  /*
2854  * Use the representative input tuple for any references to
2855  * non-aggregated input columns in the qual and tlist.
2856  */
2857  econtext->ecxt_outertuple = firstSlot;
2858 
2859  prepare_projection_slot(aggstate,
2860  econtext->ecxt_outertuple,
2861  aggstate->current_set);
2862 
2863  finalize_aggregates(aggstate, peragg, pergroup);
2864 
2865  result = project_aggregates(aggstate);
2866  if (result)
2867  return result;
2868  }
2869 
2870  /* No more groups */
2871  return NULL;
2872 }
2873 
2874 /*
2875  * Initialize HashTapeInfo
2876  */
2877 static void
2879 {
2880  HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2881  int init_tapes = 16; /* expanded dynamically */
2882 
2883  tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
2884  tapeinfo->ntapes = init_tapes;
2885  tapeinfo->nfreetapes = init_tapes;
2886  tapeinfo->freetapes_alloc = init_tapes;
2887  tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2888  for (int i = 0; i < init_tapes; i++)
2889  tapeinfo->freetapes[i] = i;
2890 
2891  aggstate->hash_tapeinfo = tapeinfo;
2892 }
2893 
2894 /*
2895  * Assign unused tapes to spill partitions, extending the tape set if
2896  * necessary.
2897  */
2898 static void
2900  int npartitions)
2901 {
2902  int partidx = 0;
2903 
2904  /* use free tapes if available */
2905  while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2906  partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2907 
2908  if (partidx < npartitions)
2909  {
2910  LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2911 
2912  while (partidx < npartitions)
2913  partitions[partidx++] = tapeinfo->ntapes++;
2914  }
2915 }
2916 
2917 /*
2918  * After a tape has already been written to and then read, this function
2919  * rewinds it for writing and adds it to the free list.
2920  */
2921 static void
2923 {
2924  /* rewinding frees the buffer while not in use */
2925  LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2926  if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2927  {
2928  tapeinfo->freetapes_alloc <<= 1;
2929  tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
2930  tapeinfo->freetapes_alloc * sizeof(int));
2931  }
2932  tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2933 }
2934 
2935 /*
2936  * hashagg_spill_init
2937  *
2938  * Called after we determined that spilling is necessary. Chooses the number
2939  * of partitions to create, and initializes them.
2940  */
2941 static void
2942 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2943  double input_groups, double hashentrysize)
2944 {
2945  int npartitions;
2946  int partition_bits;
2947 
2948  npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2949  used_bits, &partition_bits);
2950 
2951  spill->partitions = palloc0(sizeof(int) * npartitions);
2952  spill->ntuples = palloc0(sizeof(int64) * npartitions);
2953  spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
2954 
2955  hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2956 
2957  spill->tapeset = tapeinfo->tapeset;
2958  spill->shift = 32 - used_bits - partition_bits;
2959  spill->mask = (npartitions - 1) << spill->shift;
2960  spill->npartitions = npartitions;
2961 
2962  for (int i = 0; i < npartitions; i++)
2964 }
2965 
2966 /*
2967  * hashagg_spill_tuple
2968  *
2969  * No room for new groups in the hash table. Save for later in the appropriate
2970  * partition.
2971  */
2972 static Size
2974  TupleTableSlot *inputslot, uint32 hash)
2975 {
2976  LogicalTapeSet *tapeset = spill->tapeset;
2977  TupleTableSlot *spillslot;
2978  int partition;
2979  MinimalTuple tuple;
2980  int tapenum;
2981  int total_written = 0;
2982  bool shouldFree;
2983 
2984  Assert(spill->partitions != NULL);
2985 
2986  /* spill only attributes that we actually need */
2987  if (!aggstate->all_cols_needed)
2988  {
2989  spillslot = aggstate->hash_spill_wslot;
2990  slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
2991  ExecClearTuple(spillslot);
2992  for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
2993  {
2994  if (bms_is_member(i + 1, aggstate->colnos_needed))
2995  {
2996  spillslot->tts_values[i] = inputslot->tts_values[i];
2997  spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
2998  }
2999  else
3000  spillslot->tts_isnull[i] = true;
3001  }
3002  ExecStoreVirtualTuple(spillslot);
3003  }
3004  else
3005  spillslot = inputslot;
3006 
3007  tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
3008 
3009  partition = (hash & spill->mask) >> spill->shift;
3010  spill->ntuples[partition]++;
3011 
3012  /*
3013  * All hash values destined for a given partition have some bits in
3014  * common, which causes bad HLL cardinality estimates. Hash the hash to
3015  * get a more uniform distribution.
3016  */
3017  addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
3018 
3019  tapenum = spill->partitions[partition];
3020 
3021  LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
3022  total_written += sizeof(uint32);
3023 
3024  LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
3025  total_written += tuple->t_len;
3026 
3027  if (shouldFree)
3028  pfree(tuple);
3029 
3030  return total_written;
3031 }
3032 
3033 /*
3034  * hashagg_batch_new
3035  *
3036  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3037  * be done.
3038  */
3039 static HashAggBatch *
3040 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3041  int64 input_tuples, double input_card, int used_bits)
3042 {
3043  HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
3044 
3045  batch->setno = setno;
3046  batch->used_bits = used_bits;
3047  batch->tapeset = tapeset;
3048  batch->input_tapenum = tapenum;
3049  batch->input_tuples = input_tuples;
3050  batch->input_card = input_card;
3051 
3052  return batch;
3053 }
3054 
3055 /*
3056  * read_spilled_tuple
3057  * read the next tuple from a batch's tape. Return NULL if no more.
3058  */
3059 static MinimalTuple
3061 {
3062  LogicalTapeSet *tapeset = batch->tapeset;
3063  int tapenum = batch->input_tapenum;
3064  MinimalTuple tuple;
3065  uint32 t_len;
3066  size_t nread;
3067  uint32 hash;
3068 
3069  nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
3070  if (nread == 0)
3071  return NULL;
3072  if (nread != sizeof(uint32))
3073  ereport(ERROR,
3075  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3076  tapenum, sizeof(uint32), nread)));
3077  if (hashp != NULL)
3078  *hashp = hash;
3079 
3080  nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
3081  if (nread != sizeof(uint32))
3082  ereport(ERROR,
3084  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3085  tapenum, sizeof(uint32), nread)));
3086 
3087  tuple = (MinimalTuple) palloc(t_len);
3088  tuple->t_len = t_len;
3089 
3090  nread = LogicalTapeRead(tapeset, tapenum,
3091  (void *) ((char *) tuple + sizeof(uint32)),
3092  t_len - sizeof(uint32));
3093  if (nread != t_len - sizeof(uint32))
3094  ereport(ERROR,
3096  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3097  tapenum, t_len - sizeof(uint32), nread)));
3098 
3099  return tuple;
3100 }
3101 
3102 /*
3103  * hashagg_finish_initial_spills
3104  *
3105  * After a HashAggBatch has been processed, it may have spilled tuples to
3106  * disk. If so, turn the spilled partitions into new batches that must later
3107  * be executed.
3108  */
3109 static void
3111 {
3112  int setno;
3113  int total_npartitions = 0;
3114 
3115  if (aggstate->hash_spills != NULL)
3116  {
3117  for (setno = 0; setno < aggstate->num_hashes; setno++)
3118  {
3119  HashAggSpill *spill = &aggstate->hash_spills[setno];
3120 
3121  total_npartitions += spill->npartitions;
3122  hashagg_spill_finish(aggstate, spill, setno);
3123  }
3124 
3125  /*
3126  * We're not processing tuples from outer plan any more; only
3127  * processing batches of spilled tuples. The initial spill structures
3128  * are no longer needed.
3129  */
3130  pfree(aggstate->hash_spills);
3131  aggstate->hash_spills = NULL;
3132  }
3133 
3134  hash_agg_update_metrics(aggstate, false, total_npartitions);
3135  aggstate->hash_spill_mode = false;
3136 }
3137 
3138 /*
3139  * hashagg_spill_finish
3140  *
3141  * Transform spill partitions into new batches.
3142  */
3143 static void
3144 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3145 {
3146  int i;
3147  int used_bits = 32 - spill->shift;
3148 
3149  if (spill->npartitions == 0)
3150  return; /* didn't spill */
3151 
3152  for (i = 0; i < spill->npartitions; i++)
3153  {
3155  int tapenum = spill->partitions[i];
3156  HashAggBatch *new_batch;
3157  double cardinality;
3158 
3159  /* if the partition is empty, don't create a new batch of work */
3160  if (spill->ntuples[i] == 0)
3161  continue;
3162 
3163  cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3164  freeHyperLogLog(&spill->hll_card[i]);
3165 
3166  /* rewinding frees the buffer while not in use */
3167  LogicalTapeRewindForRead(tapeset, tapenum,
3169 
3170  new_batch = hashagg_batch_new(tapeset, tapenum, setno,
3171  spill->ntuples[i], cardinality,
3172  used_bits);
3173  aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
3174  aggstate->hash_batches_used++;
3175  }
3176 
3177  pfree(spill->ntuples);
3178  pfree(spill->hll_card);
3179  pfree(spill->partitions);
3180 }
3181 
3182 /*
3183  * Free resources related to a spilled HashAgg.
3184  */
3185 static void
3187 {
3188  ListCell *lc;
3189 
3190  /* free spills from initial pass */
3191  if (aggstate->hash_spills != NULL)
3192  {
3193  int setno;
3194 
3195  for (setno = 0; setno < aggstate->num_hashes; setno++)
3196  {
3197  HashAggSpill *spill = &aggstate->hash_spills[setno];
3198 
3199  pfree(spill->ntuples);
3200  pfree(spill->partitions);
3201  }
3202  pfree(aggstate->hash_spills);
3203  aggstate->hash_spills = NULL;
3204  }
3205 
3206  /* free batches */
3207  foreach(lc, aggstate->hash_batches)
3208  {
3209  HashAggBatch *batch = (HashAggBatch *) lfirst(lc);
3210 
3211  pfree(batch);
3212  }
3213  list_free(aggstate->hash_batches);
3214  aggstate->hash_batches = NIL;
3215 
3216  /* close tape set */
3217  if (aggstate->hash_tapeinfo != NULL)
3218  {
3219  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3220 
3221  LogicalTapeSetClose(tapeinfo->tapeset);
3222  pfree(tapeinfo->freetapes);
3223  pfree(tapeinfo);
3224  aggstate->hash_tapeinfo = NULL;
3225  }
3226 }
3227 
3228 
3229 /* -----------------
3230  * ExecInitAgg
3231  *
3232  * Creates the run-time information for the agg node produced by the
3233  * planner and initializes its outer subtree.
3234  *
3235  * -----------------
3236  */
3237 AggState *
3238 ExecInitAgg(Agg *node, EState *estate, int eflags)
3239 {
3240  AggState *aggstate;
3241  AggStatePerAgg peraggs;
3242  AggStatePerTrans pertransstates;
3243  AggStatePerGroup *pergroups;
3244  Plan *outerPlan;
3245  ExprContext *econtext;
3246  TupleDesc scanDesc;
3247  int numaggs,
3248  transno,
3249  aggno;
3250  int phase;
3251  int phaseidx;
3252  ListCell *l;
3253  Bitmapset *all_grouped_cols = NULL;
3254  int numGroupingSets = 1;
3255  int numPhases;
3256  int numHashes;
3257  int i = 0;
3258  int j = 0;
3259  bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3260  node->aggstrategy == AGG_MIXED);
3261 
3262  /* check for unsupported flags */
3263  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3264 
3265  /*
3266  * create state structure
3267  */
3268  aggstate = makeNode(AggState);
3269  aggstate->ss.ps.plan = (Plan *) node;
3270  aggstate->ss.ps.state = estate;
3271  aggstate->ss.ps.ExecProcNode = ExecAgg;
3272 
3273  aggstate->aggs = NIL;
3274  aggstate->numaggs = 0;
3275  aggstate->numtrans = 0;
3276  aggstate->aggstrategy = node->aggstrategy;
3277  aggstate->aggsplit = node->aggsplit;
3278  aggstate->maxsets = 0;
3279  aggstate->projected_set = -1;
3280  aggstate->current_set = 0;
3281  aggstate->peragg = NULL;
3282  aggstate->pertrans = NULL;
3283  aggstate->curperagg = NULL;
3284  aggstate->curpertrans = NULL;
3285  aggstate->input_done = false;
3286  aggstate->agg_done = false;
3287  aggstate->pergroups = NULL;
3288  aggstate->grp_firstTuple = NULL;
3289  aggstate->sort_in = NULL;
3290  aggstate->sort_out = NULL;
3291 
3292  /*
3293  * phases[0] always exists, but is dummy in sorted/plain mode
3294  */
3295  numPhases = (use_hashing ? 1 : 2);
3296  numHashes = (use_hashing ? 1 : 0);
3297 
3298  /*
3299  * Calculate the maximum number of grouping sets in any phase; this
3300  * determines the size of some allocations. Also calculate the number of
3301  * phases, since all hashed/mixed nodes contribute to only a single phase.
3302  */
3303  if (node->groupingSets)
3304  {
3305  numGroupingSets = list_length(node->groupingSets);
3306 
3307  foreach(l, node->chain)
3308  {
3309  Agg *agg = lfirst(l);
3310 
3311  numGroupingSets = Max(numGroupingSets,
3312  list_length(agg->groupingSets));
3313 
3314  /*
3315  * additional AGG_HASHED aggs become part of phase 0, but all
3316  * others add an extra phase.
3317  */
3318  if (agg->aggstrategy != AGG_HASHED)
3319  ++numPhases;
3320  else
3321  ++numHashes;
3322  }
3323  }
3324 
3325  aggstate->maxsets = numGroupingSets;
3326  aggstate->numphases = numPhases;
3327 
3328  aggstate->aggcontexts = (ExprContext **)
3329  palloc0(sizeof(ExprContext *) * numGroupingSets);
3330 
3331  /*
3332  * Create expression contexts. We need three or more, one for
3333  * per-input-tuple processing, one for per-output-tuple processing, one
3334  * for all the hashtables, and one for each grouping set. The per-tuple
3335  * memory context of the per-grouping-set ExprContexts (aggcontexts)
3336  * replaces the standalone memory context formerly used to hold transition
3337  * values. We cheat a little by using ExecAssignExprContext() to build
3338  * all of them.
3339  *
3340  * NOTE: the details of what is stored in aggcontexts and what is stored
3341  * in the regular per-query memory context are driven by a simple
3342  * decision: we want to reset the aggcontext at group boundaries (if not
3343  * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3344  */
3345  ExecAssignExprContext(estate, &aggstate->ss.ps);
3346  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3347 
3348  for (i = 0; i < numGroupingSets; ++i)
3349  {
3350  ExecAssignExprContext(estate, &aggstate->ss.ps);
3351  aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3352  }
3353 
3354  if (use_hashing)
3355  aggstate->hashcontext = CreateWorkExprContext(estate);
3356 
3357  ExecAssignExprContext(estate, &aggstate->ss.ps);
3358 
3359  /*
3360  * Initialize child nodes.
3361  *
3362  * If we are doing a hashed aggregation then the child plan does not need
3363  * to handle REWIND efficiently; see ExecReScanAgg.
3364  */
3365  if (node->aggstrategy == AGG_HASHED)
3366  eflags &= ~EXEC_FLAG_REWIND;
3367  outerPlan = outerPlan(node);
3368  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3369 
3370  /*
3371  * initialize source tuple type.
3372  */
3373  aggstate->ss.ps.outerops =
3375  &aggstate->ss.ps.outeropsfixed);
3376  aggstate->ss.ps.outeropsset = true;
3377 
3378  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3379  aggstate->ss.ps.outerops);
3380  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3381 
3382  /*
3383  * If there are more than two phases (including a potential dummy phase
3384  * 0), input will be resorted using tuplesort. Need a slot for that.
3385  */
3386  if (numPhases > 2)
3387  {
3388  aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3390 
3391  /*
3392  * The output of the tuplesort, and the output from the outer child
3393  * might not use the same type of slot. In most cases the child will
3394  * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3395  * input can also be presorted due an index, in which case it could be
3396  * a different type of slot.
3397  *
3398  * XXX: For efficiency it would be good to instead/additionally
3399  * generate expressions with corresponding settings of outerops* for
3400  * the individual phases - deforming is often a bottleneck for
3401  * aggregations with lots of rows per group. If there's multiple
3402  * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3403  * the nodeAgg.c internal tuplesort).
3404  */
3405  if (aggstate->ss.ps.outeropsfixed &&
3406  aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3407  aggstate->ss.ps.outeropsfixed = false;
3408  }
3409 
3410  /*
3411  * Initialize result type, slot and projection.
3412  */
3414  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3415 
3416  /*
3417  * initialize child expressions
3418  *
3419  * We expect the parser to have checked that no aggs contain other agg
3420  * calls in their arguments (and just to be sure, we verify it again while
3421  * initializing the plan node). This would make no sense under SQL
3422  * semantics, and it's forbidden by the spec. Because it is true, we
3423  * don't need to worry about evaluating the aggs in any particular order.
3424  *
3425  * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
3426  * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
3427  * in the targetlist are found during ExecAssignProjectionInfo, below.
3428  */
3429  aggstate->ss.ps.qual =
3430  ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3431 
3432  /*
3433  * We should now have found all Aggrefs in the targetlist and quals.
3434  */
3435  numaggs = aggstate->numaggs;
3436  Assert(numaggs == list_length(aggstate->aggs));
3437 
3438  /*
3439  * For each phase, prepare grouping set data and fmgr lookup data for
3440  * compare functions. Accumulate all_grouped_cols in passing.
3441  */
3442  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3443 
3444  aggstate->num_hashes = numHashes;
3445  if (numHashes)
3446  {
3447  aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3448  aggstate->phases[0].numsets = 0;
3449  aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3450  aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3451  }
3452 
3453  phase = 0;
3454  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3455  {
3456  Agg *aggnode;
3457  Sort *sortnode;
3458 
3459  if (phaseidx > 0)
3460  {
3461  aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3462  sortnode = castNode(Sort, aggnode->plan.lefttree);
3463  }
3464  else
3465  {
3466  aggnode = node;
3467  sortnode = NULL;
3468  }
3469 
3470  Assert(phase <= 1 || sortnode);
3471 
3472  if (aggnode->aggstrategy == AGG_HASHED
3473  || aggnode->aggstrategy == AGG_MIXED)
3474  {
3475  AggStatePerPhase phasedata = &aggstate->phases[0];
3476  AggStatePerHash perhash;
3477  Bitmapset *cols = NULL;
3478 
3479  Assert(phase == 0);
3480  i = phasedata->numsets++;
3481  perhash = &aggstate->perhash[i];
3482 
3483  /* phase 0 always points to the "real" Agg in the hash case */
3484  phasedata->aggnode = node;
3485  phasedata->aggstrategy = node->aggstrategy;
3486 
3487  /* but the actual Agg node representing this hash is saved here */
3488  perhash->aggnode = aggnode;
3489 
3490  phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3491 
3492  for (j = 0; j < aggnode->numCols; ++j)
3493  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3494 
3495  phasedata->grouped_cols[i] = cols;
3496 
3497  all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3498  continue;
3499  }
3500  else
3501  {
3502  AggStatePerPhase phasedata = &aggstate->phases[++phase];
3503  int num_sets;
3504 
3505  phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3506 
3507  if (num_sets)
3508  {
3509  phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3510  phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3511 
3512  i = 0;
3513  foreach(l, aggnode->groupingSets)
3514  {
3515  int current_length = list_length(lfirst(l));
3516  Bitmapset *cols = NULL;
3517 
3518  /* planner forces this to be correct */
3519  for (j = 0; j < current_length; ++j)
3520  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3521 
3522  phasedata->grouped_cols[i] = cols;
3523  phasedata->gset_lengths[i] = current_length;
3524 
3525  ++i;
3526  }
3527 
3528  all_grouped_cols = bms_add_members(all_grouped_cols,
3529  phasedata->grouped_cols[0]);
3530  }
3531  else
3532  {
3533  Assert(phaseidx == 0);
3534 
3535  phasedata->gset_lengths = NULL;
3536  phasedata->grouped_cols = NULL;
3537  }
3538 
3539  /*
3540  * If we are grouping, precompute fmgr lookup data for inner loop.
3541  */
3542  if (aggnode->aggstrategy == AGG_SORTED)
3543  {
3544  int i = 0;
3545 
3546  Assert(aggnode->numCols > 0);
3547 
3548  /*
3549  * Build a separate function for each subset of columns that
3550  * need to be compared.
3551  */
3552  phasedata->eqfunctions =
3553  (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3554 
3555  /* for each grouping set */
3556  for (i = 0; i < phasedata->numsets; i++)
3557  {
3558  int length = phasedata->gset_lengths[i];
3559 
3560  if (phasedata->eqfunctions[length - 1] != NULL)
3561  continue;
3562 
3563  phasedata->eqfunctions[length - 1] =
3564  execTuplesMatchPrepare(scanDesc,
3565  length,
3566  aggnode->grpColIdx,
3567  aggnode->grpOperators,
3568  aggnode->grpCollations,
3569  (PlanState *) aggstate);
3570  }
3571 
3572  /* and for all grouped columns, unless already computed */
3573  if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3574  {
3575  phasedata->eqfunctions[aggnode->numCols - 1] =
3576  execTuplesMatchPrepare(scanDesc,
3577  aggnode->numCols,
3578  aggnode->grpColIdx,
3579  aggnode->grpOperators,
3580  aggnode->grpCollations,
3581  (PlanState *) aggstate);
3582  }
3583  }
3584 
3585  phasedata->aggnode = aggnode;
3586  phasedata->aggstrategy = aggnode->aggstrategy;
3587  phasedata->sortnode = sortnode;
3588  }
3589  }
3590 
3591  /*
3592  * Convert all_grouped_cols to a descending-order list.
3593  */
3594  i = -1;
3595  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3596  aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3597 
3598  /*
3599  * Set up aggregate-result storage in the output expr context, and also
3600  * allocate my private per-agg working storage
3601  */
3602  econtext = aggstate->ss.ps.ps_ExprContext;
3603  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3604  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3605 
3606  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3607  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
3608 
3609  aggstate->peragg = peraggs;
3610  aggstate->pertrans = pertransstates;
3611 
3612 
3613  aggstate->all_pergroups =
3615  * (numGroupingSets + numHashes));
3616  pergroups = aggstate->all_pergroups;
3617 
3618  if (node->aggstrategy != AGG_HASHED)
3619  {
3620  for (i = 0; i < numGroupingSets; i++)
3621  {
3622  pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3623  * numaggs);
3624  }
3625 
3626  aggstate->pergroups = pergroups;
3627  pergroups += numGroupingSets;
3628  }
3629 
3630  /*
3631  * Hashing can only appear in the initial phase.
3632  */
3633  if (use_hashing)
3634  {
3635  Plan *outerplan = outerPlan(node);
3636  uint64 totalGroups = 0;
3637  int i;
3638 
3639  aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3640  "HashAgg meta context",
3642  aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3644  aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3645  &TTSOpsVirtual);
3646 
3647  /* this is an array of pointers, not structures */
3648  aggstate->hash_pergroup = pergroups;
3649 
3650  aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3651  outerplan->plan_width,
3652  node->transitionSpace);
3653 
3654  /*
3655  * Consider all of the grouping sets together when setting the limits
3656  * and estimating the number of partitions. This can be inaccurate
3657  * when there is more than one grouping set, but should still be
3658  * reasonable.
3659  */
3660  for (i = 0; i < aggstate->num_hashes; i++)
3661  totalGroups += aggstate->perhash[i].aggnode->numGroups;
3662 
3663  hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3664  &aggstate->hash_mem_limit,
3665  &aggstate->hash_ngroups_limit,
3666  &aggstate->hash_planned_partitions);
3667  find_hash_columns(aggstate);
3668  build_hash_tables(aggstate);
3669  aggstate->table_filled = false;
3670 
3671  /* Initialize this to 1, meaning nothing spilled, yet */
3672  aggstate->hash_batches_used = 1;
3673  }
3674 
3675  /*
3676  * Initialize current phase-dependent values to initial phase. The initial
3677  * phase is 1 (first sort pass) for all strategies that use sorting (if
3678  * hashing is being done too, then phase 0 is processed last); but if only
3679  * hashing is being done, then phase 0 is all there is.
3680  */
3681  if (node->aggstrategy == AGG_HASHED)
3682  {
3683  aggstate->current_phase = 0;
3684  initialize_phase(aggstate, 0);
3685  select_current_set(aggstate, 0, true);
3686  }
3687  else
3688  {
3689  aggstate->current_phase = 1;
3690  initialize_phase(aggstate, 1);
3691  select_current_set(aggstate, 0, false);
3692  }
3693 
3694  /* -----------------
3695  * Perform lookups of aggregate function info, and initialize the
3696  * unchanging fields of the per-agg and per-trans data.
3697  *
3698  * We try to optimize by detecting duplicate aggregate functions so that
3699  * their state and final values are re-used, rather than needlessly being
3700  * re-calculated independently. We also detect aggregates that are not
3701  * the same, but which can share the same transition state.
3702  *
3703  * Scenarios:
3704  *
3705  * 1. Identical aggregate function calls appear in the query:
3706  *
3707  * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
3708  *
3709  * Since these aggregates are identical, we only need to calculate
3710  * the value once. Both aggregates will share the same 'aggno' value.
3711  *
3712  * 2. Two different aggregate functions appear in the query, but the
3713  * aggregates have the same arguments, transition functions and
3714  * initial values (and, presumably, different final functions):
3715  *
3716  * SELECT AVG(x), STDDEV(x) FROM ...
3717  *
3718  * In this case we must create a new peragg for the varying aggregate,
3719  * and we need to call the final functions separately, but we need
3720  * only run the transition function once. (This requires that the
3721  * final functions be nondestructive of the transition state, but
3722  * that's required anyway for other reasons.)
3723  *
3724  * For either of these optimizations to be valid, all aggregate properties
3725  * used in the transition phase must be the same, including any modifiers
3726  * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
3727  * contain any volatile functions.
3728  * -----------------
3729  */
3730  aggno = -1;
3731  transno = -1;
3732  foreach(l, aggstate->aggs)
3733  {
3734  AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
3735  Aggref *aggref = aggrefstate->aggref;
3736  AggStatePerAgg peragg;
3737  AggStatePerTrans pertrans;
3738  int existing_aggno;
3739  int existing_transno;
3740  List *same_input_transnos;
3741  Oid inputTypes[FUNC_MAX_ARGS];
3742  int numArguments;
3743  int numDirectArgs;
3744  HeapTuple aggTuple;
3745  Form_pg_aggregate aggform;
3746  AclResult aclresult;
3747  Oid transfn_oid,
3748  finalfn_oid;
3749  bool shareable;
3750  Oid serialfn_oid,
3751  deserialfn_oid;
3752  Expr *finalfnexpr;
3753  Oid aggtranstype;
3754  Datum textInitVal;
3755  Datum initValue;
3756  bool initValueIsNull;
3757 
3758  /* Planner should have assigned aggregate to correct level */
3759  Assert(aggref->agglevelsup == 0);
3760  /* ... and the split mode should match */
3761  Assert(aggref->aggsplit == aggstate->aggsplit);
3762 
3763  /* 1. Check for already processed aggs which can be re-used */
3764  existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
3765  &same_input_transnos);
3766  if (existing_aggno != -1)
3767  {
3768  /*
3769  * Existing compatible agg found. so just point the Aggref to the
3770  * same per-agg struct.
3771  */
3772  aggrefstate->aggno = existing_aggno;
3773  continue;
3774  }
3775 
3776  /* Mark Aggref state node with assigned index in the result array */
3777  peragg = &peraggs[++aggno];
3778  peragg->aggref = aggref;
3779  aggrefstate->aggno = aggno;
3780 
3781  /* Fetch the pg_aggregate row */
3782  aggTuple = SearchSysCache1(AGGFNOID,
3783  ObjectIdGetDatum(aggref->aggfnoid));
3784  if (!HeapTupleIsValid(aggTuple))
3785  elog(ERROR, "cache lookup failed for aggregate %u",
3786  aggref->aggfnoid);
3787  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3788 
3789  /* Check permission to call aggregate function */
3790  aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3791  ACL_EXECUTE);
3792  if (aclresult != ACLCHECK_OK)
3793  aclcheck_error(aclresult, OBJECT_AGGREGATE,
3794  get_func_name(aggref->aggfnoid));
3796 
3797  /* planner recorded transition state type in the Aggref itself */
3798  aggtranstype = aggref->aggtranstype;
3799  Assert(OidIsValid(aggtranstype));
3800 
3801  /*
3802  * If this aggregation is performing state combines, then instead of
3803  * using the transition function, we'll use the combine function
3804  */
3805  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3806  {
3807  transfn_oid = aggform->aggcombinefn;
3808 
3809  /* If not set then the planner messed up */
3810  if (!OidIsValid(transfn_oid))
3811  elog(ERROR, "combinefn not set for aggregate function");
3812  }
3813  else
3814  transfn_oid = aggform->aggtransfn;
3815 
3816  /* Final function only required if we're finalizing the aggregates */
3817  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3818  peragg->finalfn_oid = finalfn_oid = InvalidOid;
3819  else
3820  peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3821 
3822  /*
3823  * If finalfn is marked read-write, we can't share transition states;
3824  * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
3825  * if we're not executing the finalfn here, we can share regardless.
3826  */
3827  shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
3828  (finalfn_oid == InvalidOid);
3829  peragg->shareable = shareable;
3830 
3831  serialfn_oid = InvalidOid;
3832  deserialfn_oid = InvalidOid;
3833 
3834  /*
3835  * Check if serialization/deserialization is required. We only do it
3836  * for aggregates that have transtype INTERNAL.
3837  */
3838  if (aggtranstype == INTERNALOID)
3839  {
3840  /*
3841  * The planner should only have generated a serialize agg node if
3842  * every aggregate with an INTERNAL state has a serialization
3843  * function. Verify that.
3844  */
3845  if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3846  {
3847  /* serialization only valid when not running finalfn */
3848  Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3849 
3850  if (!OidIsValid(aggform->aggserialfn))
3851  elog(ERROR, "serialfunc not provided for serialization aggregation");
3852  serialfn_oid = aggform->aggserialfn;
3853  }
3854 
3855  /* Likewise for deserialization functions */
3856  if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3857  {
3858  /* deserialization only valid when combining states */
3859  Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3860 
3861  if (!OidIsValid(aggform->aggdeserialfn))
3862  elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3863  deserialfn_oid = aggform->aggdeserialfn;
3864  }
3865  }
3866 
3867  /* Check that aggregate owner has permission to call component fns */
3868  {
3869  HeapTuple procTuple;
3870  Oid aggOwner;
3871 
3872  procTuple = SearchSysCache1(PROCOID,
3873  ObjectIdGetDatum(aggref->aggfnoid));
3874  if (!HeapTupleIsValid(procTuple))
3875  elog(ERROR, "cache lookup failed for function %u",
3876  aggref->aggfnoid);
3877  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3878  ReleaseSysCache(procTuple);
3879 
3880  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3881  ACL_EXECUTE);
3882  if (aclresult != ACLCHECK_OK)
3883  aclcheck_error(aclresult, OBJECT_FUNCTION,
3884  get_func_name(transfn_oid));
3885  InvokeFunctionExecuteHook(transfn_oid);
3886  if (OidIsValid(finalfn_oid))
3887  {
3888  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3889  ACL_EXECUTE);
3890  if (aclresult != ACLCHECK_OK)
3891  aclcheck_error(aclresult, OBJECT_FUNCTION,
3892  get_func_name(finalfn_oid));
3893  InvokeFunctionExecuteHook(finalfn_oid);
3894  }
3895  if (OidIsValid(serialfn_oid))
3896  {
3897  aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3898  ACL_EXECUTE);
3899  if (aclresult != ACLCHECK_OK)
3900  aclcheck_error(aclresult, OBJECT_FUNCTION,
3901  get_func_name(serialfn_oid));
3902  InvokeFunctionExecuteHook(serialfn_oid);
3903  }
3904  if (OidIsValid(deserialfn_oid))
3905  {
3906  aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3907  ACL_EXECUTE);
3908  if (aclresult != ACLCHECK_OK)
3909  aclcheck_error(aclresult, OBJECT_FUNCTION,
3910  get_func_name(deserialfn_oid));
3911  InvokeFunctionExecuteHook(deserialfn_oid);
3912  }
3913  }
3914 
3915  /*
3916  * Get actual datatypes of the (nominal) aggregate inputs. These
3917  * could be different from the agg's declared input types, when the
3918  * agg accepts ANY or a polymorphic type.
3919  */
3920  numArguments = get_aggregate_argtypes(aggref, inputTypes);
3921 
3922  /* Count the "direct" arguments, if any */
3923  numDirectArgs = list_length(aggref->aggdirectargs);
3924 
3925  /* Detect how many arguments to pass to the finalfn */
3926  if (aggform->aggfinalextra)
3927  peragg->numFinalArgs = numArguments + 1;
3928  else
3929  peragg->numFinalArgs = numDirectArgs + 1;
3930 
3931  /* Initialize any direct-argument expressions */
3932  peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3933  (PlanState *) aggstate);
3934 
3935  /*
3936  * build expression trees using actual argument & result types for the
3937  * finalfn, if it exists and is required.
3938  */
3939  if (OidIsValid(finalfn_oid))
3940  {
3941  build_aggregate_finalfn_expr(inputTypes,
3942  peragg->numFinalArgs,
3943  aggtranstype,
3944  aggref->aggtype,
3945  aggref->inputcollid,
3946  finalfn_oid,
3947  &finalfnexpr);
3948  fmgr_info(finalfn_oid, &peragg->finalfn);
3949  fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3950  }
3951 
3952  /* get info about the output value's datatype */
3953  get_typlenbyval(aggref->aggtype,
3954  &peragg->resulttypeLen,
3955  &peragg->resulttypeByVal);
3956 
3957  /*
3958  * initval is potentially null, so don't try to access it as a struct
3959  * field. Must do it the hard way with SysCacheGetAttr.
3960  */
3961  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3962  Anum_pg_aggregate_agginitval,
3963  &initValueIsNull);
3964  if (initValueIsNull)
3965  initValue = (Datum) 0;
3966  else
3967  initValue = GetAggInitVal(textInitVal, aggtranstype);
3968 
3969  /*
3970  * 2. Build working state for invoking the transition function, or
3971  * look up previously initialized working state, if we can share it.
3972  *
3973  * find_compatible_peragg() already collected a list of shareable
3974  * per-Trans's with the same inputs. Check if any of them have the
3975  * same transition function and initial value.
3976  */
3977  existing_transno = find_compatible_pertrans(aggstate, aggref,
3978  shareable,
3979  transfn_oid, aggtranstype,
3980  serialfn_oid, deserialfn_oid,
3981  initValue, initValueIsNull,
3982  same_input_transnos);
3983  if (existing_transno != -1)
3984  {
3985  /*
3986  * Existing compatible trans found, so just point the 'peragg' to
3987  * the same per-trans struct, and mark the trans state as shared.
3988  */
3989  pertrans = &pertransstates[existing_transno];
3990  pertrans->aggshared = true;
3991  peragg->transno = existing_transno;
3992  }
3993  else
3994  {
3995  pertrans = &pertransstates[++transno];
3996  build_pertrans_for_aggref(pertrans, aggstate, estate,
3997  aggref, transfn_oid, aggtranstype,
3998  serialfn_oid, deserialfn_oid,
3999  initValue, initValueIsNull,
4000  inputTypes, numArguments);
4001  peragg->transno = transno;
4002  }
4003  ReleaseSysCache(aggTuple);
4004  }
4005 
4006  /*
4007  * Update aggstate->numaggs to be the number of unique aggregates found.
4008  * Also set numstates to the number of unique transition states found.
4009  */
4010  aggstate->numaggs = aggno + 1;
4011  aggstate->numtrans = transno + 1;
4012 
4013  /*
4014  * Last, check whether any more aggregates got added onto the node while
4015  * we processed the expressions for the aggregate arguments (including not
4016  * only the regular arguments and FILTER expressions handled immediately
4017  * above, but any direct arguments we might've handled earlier). If so,
4018  * we have nested aggregate functions, which is semantically nonsensical,
4019  * so complain. (This should have been caught by the parser, so we don't
4020  * need to work hard on a helpful error message; but we defend against it
4021  * here anyway, just to be sure.)
4022  */
4023  if (numaggs != list_length(aggstate->aggs))
4024  ereport(ERROR,
4025  (errcode(ERRCODE_GROUPING_ERROR),
4026  errmsg("aggregate function calls cannot be nested")));
4027 
4028  /*
4029  * Build expressions doing all the transition work at once. We build a
4030  * different one for each phase, as the number of transition function
4031  * invocation can differ between phases. Note this'll work both for
4032  * transition and combination functions (although there'll only be one
4033  * phase in the latter case).
4034  */
4035  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
4036  {
4037  AggStatePerPhase phase = &aggstate->phases[phaseidx];
4038  bool dohash = false;
4039  bool dosort = false;
4040 
4041  /* phase 0 doesn't necessarily exist */
4042  if (!phase->aggnode)
4043  continue;
4044 
4045  if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
4046  {
4047  /*
4048  * Phase one, and only phase one, in a mixed agg performs both
4049  * sorting and aggregation.
4050  */
4051  dohash = true;
4052  dosort = true;
4053  }
4054  else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4055  {
4056  /*
4057  * No need to compute a transition function for an AGG_MIXED phase
4058  * 0 - the contents of the hashtables will have been computed
4059  * during phase 1.
4060  */
4061  continue;
4062  }
4063  else if (phase->aggstrategy == AGG_PLAIN ||
4064  phase->aggstrategy == AGG_SORTED)
4065  {
4066  dohash = false;
4067  dosort = true;
4068  }
4069  else if (phase->aggstrategy == AGG_HASHED)
4070  {
4071  dohash = true;
4072  dosort = false;
4073  }
4074  else
4075  Assert(false);
4076 
4077  phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4078  false);
4079 
4080  /* cache compiled expression for outer slot without NULL check */
4081  phase->evaltrans_cache[0][0] = phase->evaltrans;
4082  }
4083 
4084  return aggstate;
4085 }
4086 
4087 /*
4088  * Build the state needed to calculate a state value for an aggregate.
4089  *
4090  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4091  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
4092  * of the arguments could be calculated from 'aggref', but the caller has
4093  * calculated them already, so might as well pass them.
4094  */
4095 static void
4097  AggState *aggstate, EState *estate,
4098  Aggref *aggref,
4099  Oid aggtransfn, Oid aggtranstype,
4100  Oid aggserialfn, Oid aggdeserialfn,
4101  Datum initValue, bool initValueIsNull,
4102  Oid *inputTypes, int numArguments)
4103 {
4104  int numGroupingSets = Max(aggstate->maxsets, 1);
4105  Expr *serialfnexpr = NULL;
4106  Expr *deserialfnexpr = NULL;
4107  ListCell *lc;
4108  int numInputs;
4109  int numDirectArgs;
4110  List *sortlist;
4111  int numSortCols;
4112  int numDistinctCols;
4113  int i;
4114 
4115  /* Begin filling in the pertrans data */
4116  pertrans->aggref = aggref;
4117  pertrans->aggshared = false;
4118  pertrans->aggCollation = aggref->inputcollid;
4119  pertrans->transfn_oid = aggtransfn;
4120  pertrans->serialfn_oid = aggserialfn;
4121  pertrans->deserialfn_oid = aggdeserialfn;
4122  pertrans->initValue = initValue;
4123  pertrans->initValueIsNull = initValueIsNull;
4124 
4125  /* Count the "direct" arguments, if any */
4126  numDirectArgs = list_length(aggref->aggdirectargs);
4127 
4128  /* Count the number of aggregated input columns */
4129  pertrans->numInputs = numInputs = list_length(aggref->args);
4130 
4131  pertrans->aggtranstype = aggtranstype;
4132 
4133  /*
4134  * When combining states, we have no use at all for the aggregate
4135  * function's transfn. Instead we use the combinefn. In this case, the
4136  * transfn and transfn_oid fields of pertrans refer to the combine
4137  * function rather than the transition function.
4138  */
4139  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4140  {
4141  Expr *combinefnexpr;
4142  size_t numTransArgs;
4143 
4144  /*
4145  * When combining there's only one input, the to-be-combined added
4146  * transition value from below (this node's transition value is
4147  * counted separately).
4148  */
4149  pertrans->numTransInputs = 1;
4150 
4151  /* account for the current transition state */
4152  numTransArgs = pertrans->numTransInputs + 1;
4153 
4154  build_aggregate_combinefn_expr(aggtranstype,
4155  aggref->inputcollid,
4156  aggtransfn,
4157  &combinefnexpr);
4158  fmgr_info(aggtransfn, &pertrans->transfn);
4159  fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4160 
4161  pertrans->transfn_fcinfo =
4164  &pertrans->transfn,
4165  numTransArgs,
4166  pertrans->aggCollation,
4167  (void *) aggstate, NULL);
4168 
4169  /*
4170  * Ensure that a combine function to combine INTERNAL states is not
4171  * strict. This should have been checked during CREATE AGGREGATE, but
4172  * the strict property could have been changed since then.
4173  */
4174  if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4175  ereport(ERROR,
4176  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4177  errmsg("combine function with transition type %s must not be declared STRICT",
4178  format_type_be(aggtranstype))));
4179  }
4180  else
4181  {
4182  Expr *transfnexpr;
4183  size_t numTransArgs;
4184 
4185  /* Detect how many arguments to pass to the transfn */
4186  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4187  pertrans->numTransInputs = numInputs;
4188  else
4189  pertrans->numTransInputs = numArguments;
4190 
4191  /* account for the current transition state */
4192  numTransArgs = pertrans->numTransInputs + 1;
4193 
4194  /*
4195  * Set up infrastructure for calling the transfn. Note that
4196  * invtransfn is not needed here.
4197  */
4198  build_aggregate_transfn_expr(inputTypes,
4199  numArguments,
4200  numDirectArgs,
4201  aggref->aggvariadic,
4202  aggtranstype,
4203  aggref->inputcollid,
4204  aggtransfn,
4205  InvalidOid,
4206  &transfnexpr,
4207  NULL);
4208  fmgr_info(aggtransfn, &pertrans->transfn);
4209  fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4210 
4211  pertrans->transfn_fcinfo =
4214  &pertrans->transfn,
4215  numTransArgs,
4216  pertrans->aggCollation,
4217  (void *) aggstate, NULL);
4218 
4219  /*
4220  * If the transfn is strict and the initval is NULL, make sure input
4221  * type and transtype are the same (or at least binary-compatible), so
4222  * that it's OK to use the first aggregated input value as the initial
4223  * transValue. This should have been checked at agg definition time,
4224  * but we must check again in case the transfn's strictness property
4225  * has been changed.
4226  */
4227  if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4228  {
4229  if (numArguments <= numDirectArgs ||
4230  !IsBinaryCoercible(inputTypes[numDirectArgs],
4231  aggtranstype))
4232  ereport(ERROR,
4233  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4234  errmsg("aggregate %u needs to have compatible input type and transition type",
4235  aggref->aggfnoid)));
4236  }
4237  }
4238 
4239  /* get info about the state value's datatype */
4240  get_typlenbyval(aggtranstype,
4241  &pertrans->transtypeLen,
4242  &pertrans->transtypeByVal);
4243 
4244  if (OidIsValid(aggserialfn))
4245  {
4246  build_aggregate_serialfn_expr(aggserialfn,
4247  &serialfnexpr);
4248  fmgr_info(aggserialfn, &pertrans->serialfn);
4249  fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4250 
4251  pertrans->serialfn_fcinfo =
4254  &pertrans->serialfn,
4255  1,
4256  InvalidOid,
4257  (void *) aggstate, NULL);
4258  }
4259 
4260  if (OidIsValid(aggdeserialfn))
4261  {
4262  build_aggregate_deserialfn_expr(aggdeserialfn,
4263  &deserialfnexpr);
4264  fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4265  fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4266 
4267  pertrans->deserialfn_fcinfo =
4270  &pertrans->deserialfn,
4271  2,
4272  InvalidOid,
4273  (void *) aggstate, NULL);
4274 
4275  }
4276 
4277  /*
4278  * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4279  * have a list of SortGroupClause nodes; fish out the data in them and
4280  * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4281  * however; the agg's transfn and finalfn are responsible for that.
4282  *
4283  * Note that by construction, if there is a DISTINCT clause then the ORDER
4284  * BY clause is a prefix of it (see transformDistinctClause).
4285  */
4286  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4287  {
4288  sortlist = NIL;
4289  numSortCols = numDistinctCols = 0;
4290  }
4291  else if (aggref->aggdistinct)
4292  {
4293  sortlist = aggref->aggdistinct;
4294  numSortCols = numDistinctCols = list_length(sortlist);
4295  Assert(numSortCols >= list_length(aggref->aggorder));
4296  }
4297  else
4298  {
4299  sortlist = aggref->aggorder;
4300  numSortCols = list_length(sortlist);
4301  numDistinctCols = 0;
4302  }
4303 
4304  pertrans->numSortCols = numSortCols;
4305  pertrans->numDistinctCols = numDistinctCols;
4306 
4307  /*
4308  * If we have either sorting or filtering to do, create a tupledesc and
4309  * slot corresponding to the aggregated inputs (including sort
4310  * expressions) of the agg.
4311  */
4312  if (numSortCols > 0 || aggref->aggfilter)
4313  {
4314  pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4315  pertrans->sortslot =
4316  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4318  }
4319 
4320  if (numSortCols > 0)
4321  {
4322  /*
4323  * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4324  * (yet)
4325  */
4326  Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4327 
4328  /* If we have only one input, we need its len/byval info. */
4329  if (numInputs == 1)
4330  {
4331  get_typlenbyval(inputTypes[numDirectArgs],
4332  &pertrans->inputtypeLen,
4333  &pertrans->inputtypeByVal);
4334  }
4335  else if (numDistinctCols > 0)
4336  {
4337  /* we will need an extra slot to store prior values */
4338  pertrans->uniqslot =
4339  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4341  }
4342 
4343  /* Extract the sort information for use later */
4344  pertrans->sortColIdx =
4345  (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4346  pertrans->sortOperators =
4347  (Oid *) palloc(numSortCols * sizeof(Oid));
4348  pertrans->sortCollations =
4349  (Oid *) palloc(numSortCols * sizeof(Oid));
4350  pertrans->sortNullsFirst =
4351  (bool *) palloc(numSortCols * sizeof(bool));
4352 
4353  i = 0;
4354  foreach(lc, sortlist)
4355  {
4356  SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4357  TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4358 
4359  /* the parser should have made sure of this */
4360  Assert(OidIsValid(sortcl->sortop));
4361 
4362  pertrans->sortColIdx[i] = tle->resno;
4363  pertrans->sortOperators[i] = sortcl->sortop;
4364  pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4365  pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4366  i++;
4367  }
4368  Assert(i == numSortCols);
4369  }
4370 
4371  if (aggref->aggdistinct)
4372  {
4373  Oid *ops;
4374 
4375  Assert(numArguments > 0);
4376  Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4377 
4378  ops = palloc(numDistinctCols * sizeof(Oid));
4379 
4380  i = 0;
4381  foreach(lc, aggref->aggdistinct)
4382  ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4383 
4384  /* lookup / build the necessary comparators */
4385  if (numDistinctCols == 1)
4386  fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4387  else
4388  pertrans->equalfnMulti =
4389  execTuplesMatchPrepare(pertrans->sortdesc,
4390  numDistinctCols,
4391  pertrans->sortColIdx,
4392  ops,
4393  pertrans->sortCollations,
4394  &aggstate->ss.ps);
4395  pfree(ops);
4396  }
4397 
4398  pertrans->sortstates = (Tuplesortstate **)
4399  palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4400 }
4401 
4402 
4403 static Datum
4404 GetAggInitVal(Datum textInitVal, Oid transtype)
4405 {
4406  Oid typinput,
4407  typioparam;
4408  char *strInitVal;
4409  Datum initVal;
4410 
4411  getTypeInputInfo(transtype, &typinput, &typioparam);
4412  strInitVal = TextDatumGetCString(textInitVal);
4413  initVal = OidInputFunctionCall(typinput, strInitVal,
4414  typioparam, -1);
4415  pfree(strInitVal);
4416  return initVal;
4417 }
4418 
4419 /*
4420  * find_compatible_peragg - search for a previously initialized per-Agg struct
4421  *
4422  * Searches the previously looked at aggregates to find one which is compatible
4423  * with this one, with the same input parameters. If no compatible aggregate
4424  * can be found, returns -1.
4425  *
4426  * As a side-effect, this also collects a list of existing, shareable per-Trans
4427  * structs with matching inputs. If no identical Aggref is found, the list is
4428  * passed later to find_compatible_pertrans, to see if we can at least reuse
4429  * the state value of another aggregate.
4430  */
4431 static int
4433  int lastaggno, List **same_input_transnos)
4434 {
4435  int aggno;
4436  AggStatePerAgg peraggs;
4437 
4438  *same_input_transnos = NIL;
4439 
4440  /* we mustn't reuse the aggref if it contains volatile function calls */
4441  if (contain_volatile_functions((Node *) newagg))
4442  return -1;
4443 
4444  peraggs = aggstate->peragg;
4445 
4446  /*
4447  * Search through the list of already seen aggregates. If we find an
4448  * existing identical aggregate call, then we can re-use that one. While
4449  * searching, we'll also collect a list of Aggrefs with the same input
4450  * parameters. If no matching Aggref is found, the caller can potentially
4451  * still re-use the transition state of one of them. (At this stage we
4452  * just compare the parsetrees; whether different aggregates share the
4453  * same transition function will be checked later.)
4454  */
4455  for (aggno = 0; aggno <= lastaggno; aggno++)
4456  {
4457  AggStatePerAgg peragg;
4458  Aggref *existingRef;
4459 
4460  peragg = &peraggs[aggno];
4461  existingRef = peragg->aggref;
4462 
4463  /* all of the following must be the same or it's no match */
4464  if (newagg->inputcollid != existingRef->inputcollid ||
4465  newagg->aggtranstype != existingRef->aggtranstype ||
4466  newagg->aggstar != existingRef->aggstar ||
4467  newagg->aggvariadic != existingRef->aggvariadic ||
4468  newagg->aggkind != existingRef->aggkind ||
4469  !equal(newagg->args, existingRef->args) ||
4470  !equal(newagg->aggorder, existingRef->aggorder) ||
4471  !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
4472  !equal(newagg->aggfilter, existingRef->aggfilter))
4473  continue;
4474 
4475  /* if it's the same aggregate function then report exact match */
4476  if (newagg->aggfnoid == existingRef->aggfnoid &&
4477  newagg->aggtype == existingRef->aggtype &&
4478  newagg->aggcollid == existingRef->aggcollid &&
4479  equal(newagg->aggdirectargs, existingRef->aggdirectargs))
4480  {
4481  list_free(*same_input_transnos);
4482  *same_input_transnos = NIL;
4483  return aggno;
4484  }
4485 
4486  /*
4487  * Not identical, but it had the same inputs. If the final function
4488  * permits sharing, return its transno to the caller, in case we can
4489  * re-use its per-trans state. (If there's already sharing going on,
4490  * we might report a transno more than once. find_compatible_pertrans
4491  * is cheap enough that it's not worth spending cycles to avoid that.)
4492  */
4493  if (peragg->shareable)
4494  *same_input_transnos = lappend_int(*same_input_transnos,
4495  peragg->transno);
4496  }
4497 
4498  return -1;
4499 }
4500 
4501 /*
4502  * find_compatible_pertrans - search for a previously initialized per-Trans
4503  * struct
4504  *
4505  * Searches the list of transnos for a per-Trans struct with the same
4506  * transition function and initial condition. (The inputs have already been
4507  * verified to match.)
4508  */
4509 static int
4510 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
4511  Oid aggtransfn, Oid aggtranstype,
4512  Oid aggserialfn, Oid aggdeserialfn,
4513  Datum initValue, bool initValueIsNull,
4514  List *transnos)
4515 {
4516  ListCell *lc;
4517 
4518  /* If this aggregate can't share transition states, give up */
4519  if (!shareable)
4520  return -1;
4521 
4522  foreach(lc, transnos)
4523  {
4524  int transno = lfirst_int(lc);
4525  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
4526 
4527  /*
4528  * if the transfns or transition state types are not the same then the
4529  * state can't be shared.
4530  */
4531  if (aggtransfn != pertrans->transfn_oid ||
4532  aggtranstype != pertrans->aggtranstype)
4533  continue;
4534 
4535  /*
4536  * The serialization and deserialization functions must match, if
4537  * present, as we're unable to share the trans state for aggregates
4538  * which will serialize or deserialize into different formats.
4539  * Remember that these will be InvalidOid if they're not required for
4540  * this agg node.
4541  */
4542  if (aggserialfn != pertrans->serialfn_oid ||
4543  aggdeserialfn != pertrans->deserialfn_oid)
4544  continue;
4545 
4546  /*
4547  * Check that the initial condition matches, too.
4548  */
4549  if (initValueIsNull && pertrans->initValueIsNull)
4550  return transno;
4551 
4552  if (!initValueIsNull && !pertrans->initValueIsNull &&
4553  datumIsEqual(initValue, pertrans->initValue,
4554  pertrans->transtypeByVal, pertrans->transtypeLen))
4555  return transno;
4556  }
4557  return -1;
4558 }
4559 
4560 void
4562 {
4564  int transno;
4565  int numGroupingSets = Max(node->maxsets, 1);
4566  int setno;
4567 
4568  /*
4569  * When ending a parallel worker, copy the statistics gathered by the
4570  * worker back into shared memory so that it can be picked up by the main
4571  * process to report in EXPLAIN ANALYZE.
4572  */
4573  if (node->shared_info && IsParallelWorker())
4574  {
4576 
4577  Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4580  si->hash_disk_used = node->hash_disk_used;
4581  si->hash_mem_peak = node->hash_mem_peak;
4582  }
4583 
4584  /* Make sure we have closed any open tuplesorts */
4585 
4586  if (node->sort_in)
4587  tuplesort_end(node->sort_in);
4588  if (node->sort_out)
4589  tuplesort_end(node->sort_out);
4590 
4592 
4593  if (node->hash_metacxt != NULL)
4594  {
4596  node->hash_metacxt = NULL;
4597  }
4598 
4599  for (transno = 0; transno < node->numtrans; transno++)
4600  {
4601  AggStatePerTrans pertrans = &node->pertrans[transno];
4602 
4603  for (setno = 0; setno < numGroupingSets; setno++)
4604  {
4605  if (pertrans->sortstates[setno])
4606  tuplesort_end(pertrans->sortstates[setno]);
4607  }
4608  }
4609 
4610  /* And ensure any agg shutdown callbacks have been called */
4611  for (setno = 0; setno < numGroupingSets; setno++)
4612  ReScanExprContext(node->aggcontexts[setno]);
4613  if (node->hashcontext)
4615 
4616  /*
4617  * We don't actually free any ExprContexts here (see comment in
4618  * ExecFreeExprContext), just unlinking the output one from the plan node
4619  * suffices.
4620  */
4621  ExecFreeExprContext(&node->ss.ps);
4622 
4623  /* clean up tuple table */
4625 
4626  outerPlan = outerPlanState(node);
4627  ExecEndNode(outerPlan);
4628 }
4629 
4630 void
4632 {
4633  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4635  Agg *aggnode = (Agg *) node->ss.ps.plan;
4636  int transno;
4637  int numGroupingSets = Max(node->maxsets, 1);
4638  int setno;
4639 
4640  node->agg_done = false;
4641 
4642  if (node->aggstrategy == AGG_HASHED)
4643  {
4644  /*
4645  * In the hashed case, if we haven't yet built the hash table then we
4646  * can just return; nothing done yet, so nothing to undo. If subnode's
4647  * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4648  * else no reason to re-scan it at all.
4649  */
4650  if (!node->table_filled)
4651  return;
4652 
4653  /*
4654  * If we do have the hash table, and it never spilled, and the subplan
4655  * does not have any parameter changes, and none of our own parameter
4656  * changes affect input expressions of the aggregated functions, then
4657  * we can just rescan the existing hash table; no need to build it
4658  * again.
4659  */
4660  if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4661  !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4662  {
4664  &node->perhash[0].hashiter);
4665  select_current_set(node, 0, true);
4666  return;
4667  }
4668  }
4669 
4670  /* Make sure we have closed any open tuplesorts */
4671  for (transno = 0; transno < node->numtrans; transno++)
4672  {
4673  for (setno = 0; setno < numGroupingSets; setno++)
4674  {
4675  AggStatePerTrans pertrans = &node->pertrans[transno];
4676 
4677  if (pertrans->sortstates[setno])
4678  {
4679  tuplesort_end(pertrans->sortstates[setno]);
4680  pertrans->sortstates[setno] = NULL;
4681  }
4682  }
4683  }
4684 
4685  /*
4686  * We don't need to ReScanExprContext the output tuple context here;
4687  * ExecReScan already did it. But we do need to reset our per-grouping-set
4688  * contexts, which may have transvalues stored in them. (We use rescan
4689  * rather than just reset because transfns may have registered callbacks
4690  * that need to be run now.) For the AGG_HASHED case, see below.
4691  */
4692 
4693  for (setno = 0; setno < numGroupingSets; setno++)
4694  {
4695  ReScanExprContext(node->aggcontexts[setno]);
4696  }
4697 
4698  /* Release first tuple of group, if we have made a copy */
4699  if (node->grp_firstTuple != NULL)
4700  {
4702  node->grp_firstTuple = NULL;
4703  }
4705 
4706  /* Forget current agg values */
4707  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4708  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4709 
4710  /*
4711  * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4712  * the hashcontext. This used to be an issue, but now, resetting a context
4713  * automatically deletes sub-contexts too.
4714  */
4715  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4716  {
4718 
4719  node->hash_ever_spilled = false;
4720  node->hash_spill_mode = false;
4721  node->hash_ngroups_current = 0;
4722 
4724  /* Rebuild an empty hash table */
4725  build_hash_tables(node);
4726  node->table_filled = false;
4727  /* iterator will be reset when the table is filled */
4728 
4729  hashagg_recompile_expressions(node, false, false);
4730  }
4731 
4732  if (node->aggstrategy != AGG_HASHED)
4733  {
4734  /*
4735  * Reset the per-group state (in particular, mark transvalues null)
4736  */
4737  for (setno = 0; setno < numGroupingSets; setno++)
4738  {
4739  MemSet(node->pergroups[setno], 0,
4740  sizeof(AggStatePerGroupData) * node->numaggs);
4741  }
4742 
4743  /* reset to phase 1 */
4744  initialize_phase(node, 1);
4745 
4746  node->input_done = false;
4747  node->projected_set = -1;
4748  }
4749 
4750  if (outerPlan->chgParam == NULL)
4751  ExecReScan(outerPlan);
4752 }
4753 
4754 
4755 /***********************************************************************
4756  * API exposed to aggregate functions
4757  ***********************************************************************/
4758 
4759 
4760 /*
4761  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4762  *
4763  * The transition and/or final functions of an aggregate may want to verify
4764  * that they are being called as aggregates, rather than as plain SQL
4765  * functions. They should use this function to do so. The return value
4766  * is nonzero if being called as an aggregate, or zero if not. (Specific
4767  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4768  * values could conceivably appear in future.)
4769  *
4770  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4771  * identity of the memory context that aggregate transition values are being
4772  * stored in. Note that the same aggregate call site (flinfo) may be called
4773  * interleaved on different transition values in different contexts, so it's
4774  * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4775  * cache it in the transvalue itself (for internal-type transvalues).
4776  */
4777 int
4779 {
4780  if (fcinfo->context && IsA(fcinfo->context, AggState))
4781  {
4782  if (aggcontext)
4783  {
4784  AggState *aggstate = ((AggState *) fcinfo->context);
4785  ExprContext *cxt = aggstate->curaggcontext;
4786 
4787  *aggcontext = cxt->ecxt_per_tuple_memory;
4788  }
4789  return AGG_CONTEXT_AGGREGATE;
4790  }
4791  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4792  {
4793  if (aggcontext)
4794  *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4795  return AGG_CONTEXT_WINDOW;
4796  }
4797 
4798  /* this is just to prevent "uninitialized variable" warnings */
4799  if (aggcontext)
4800  *aggcontext = NULL;
4801  return 0;
4802 }
4803 
4804 /*
4805  * AggGetAggref - allow an aggregate support function to get its Aggref
4806  *
4807  * If the function is being called as an aggregate support function,
4808  * return the Aggref node for the aggregate call. Otherwise, return NULL.
4809  *
4810  * Aggregates sharing the same inputs and transition functions can get
4811  * merged into a single transition calculation. If the transition function
4812  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4813  * executing. It must therefore not pay attention to the Aggref fields that
4814  * relate to the final function, as those are indeterminate. But if a final
4815  * function calls AggGetAggref, it will get a precise result.
4816  *
4817  * Note that if an aggregate is being used as a window function, this will
4818  * return NULL. We could provide a similar function to return the relevant
4819  * WindowFunc node in such cases, but it's not needed yet.
4820  */
4821 Aggref *
4823 {
4824  if (fcinfo->context && IsA(fcinfo->context, AggState))
4825  {
4826  AggState *aggstate = (AggState *) fcinfo->context;
4827  AggStatePerAgg curperagg;
4828  AggStatePerTrans curpertrans;
4829 
4830  /* check curperagg (valid when in a final function) */
4831  curperagg = aggstate->curperagg;
4832 
4833  if (curperagg)
4834  return curperagg->aggref;
4835 
4836  /* check curpertrans (valid when in a transition function) */
4837  curpertrans = aggstate->curpertrans;
4838 
4839  if (curpertrans)
4840  return curpertrans->aggref;
4841  }
4842  return NULL;
4843 }
4844 
4845 /*
4846  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4847  *
4848  * This is useful in agg final functions; the context returned is one that
4849  * the final function can safely reset as desired. This isn't useful for
4850  * transition functions, since the context returned MAY (we don't promise)
4851  * be the same as the context those are called in.
4852  *
4853  * As above, this is currently not useful for aggs called as window functions.
4854  */
4857 {
4858  if (fcinfo->context && IsA(fcinfo->context, AggState))
4859  {
4860  AggState *aggstate = (AggState *) fcinfo->context;
4861 
4862  return aggstate->tmpcontext->ecxt_per_tuple_memory;
4863  }
4864  return NULL;
4865 }
4866 
4867 /*
4868  * AggStateIsShared - find out whether transition state is shared
4869  *
4870  * If the function is being called as an aggregate support function,
4871  * return true if the aggregate's transition state is shared across
4872  * multiple aggregates, false if it is not.
4873  *
4874  * Returns true if not called as an aggregate support function.
4875  * This is intended as a conservative answer, ie "no you'd better not
4876  * scribble on your input". In particular, will return true if the
4877  * aggregate is being used as a window function, which is a scenario
4878  * in which changing the transition state is a bad idea. We might
4879  * want to refine the behavior for the window case in future.
4880  */
4881 bool
4883 {
4884  if (fcinfo->context && IsA(fcinfo->context, AggState))
4885  {
4886  AggState *aggstate = (AggState *) fcinfo->context;
4887  AggStatePerAgg curperagg;
4888  AggStatePerTrans curpertrans;
4889 
4890  /* check curperagg (valid when in a final function) */
4891  curperagg = aggstate->curperagg;
4892 
4893  if (curperagg)
4894  return aggstate->pertrans[curperagg->transno].aggshared;
4895 
4896  /* check curpertrans (valid when in a transition function) */
4897  curpertrans = aggstate->curpertrans;
4898 
4899  if (curpertrans)
4900  return curpertrans->aggshared;
4901  }
4902  return true;
4903 }
4904 
4905 /*
4906  * AggRegisterCallback - register a cleanup callback for an aggregate
4907  *
4908  * This is useful for aggs to register shutdown callbacks, which will ensure
4909  * that non-memory resources are freed. The callback will occur just before
4910  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4911  * either between groups or as a result of rescanning the query. The callback
4912  * will NOT be called on error paths. The typical use-case is for freeing of
4913  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4914  * created by the agg functions. (The callback will not be called until after
4915  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4916  * to return data that will be freed by the callback.)
4917  *
4918  * As above, this is currently not useful for aggs called as window functions.
4919  */
4920 void
4923  Datum arg)
4924 {
4925  if (fcinfo->context && IsA(fcinfo->context, AggState))
4926  {
4927  AggState *aggstate = (AggState *) fcinfo->context;
4928  ExprContext *cxt = aggstate->curaggcontext;
4929 
4930  RegisterExprContextCallback(cxt, func, arg);
4931 
4932  return;
4933  }
4934  elog(ERROR, "aggregate function cannot register a callback in this context");
4935 }
4936 
4937 
4938 /*
4939  * aggregate_dummy - dummy execution routine for aggregate functions
4940  *
4941  * This function is listed as the implementation (prosrc field) of pg_proc
4942  * entries for aggregate functions. Its only purpose is to throw an error
4943  * if someone mistakenly executes such a function in the normal way.
4944  *
4945  * Perhaps someday we could assign real meaning to the prosrc field of
4946  * an aggregate?
4947  */
4948 Datum
4950 {
4951  elog(ERROR, "aggregate function %u called as normal function",
4952  fcinfo->flinfo->fn_oid);
4953  return (Datum) 0; /* keep compiler quiet */
4954 }
4955 
4956 /* ----------------------------------------------------------------
4957  * Parallel Query Support
4958  * ----------------------------------------------------------------
4959  */
4960 
4961  /* ----------------------------------------------------------------
4962  * ExecAggEstimate
4963  *
4964  * Estimate space required to propagate aggregate statistics.
4965  * ----------------------------------------------------------------
4966  */
4967 void
4969 {
4970  Size size;
4971 
4972  /* don't need this if not instrumenting or no workers */
4973  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4974  return;
4975 
4976  size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4977  size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4978  shm_toc_estimate_chunk(&pcxt->estimator, size);
4979  shm_toc_estimate_keys(&pcxt->estimator, 1);
4980 }
4981 
4982 /* ----------------------------------------------------------------
4983  * ExecAggInitializeDSM
4984  *
4985  * Initialize DSM space for aggregate statistics.
4986  * ----------------------------------------------------------------
4987  */
4988 void
4990 {
4991  Size size;
4992 
4993  /* don't need this if not instrumenting or no workers */
4994  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4995  return;
4996 
4997  size = offsetof(SharedAggInfo, sinstrument)
4998  + pcxt->nworkers * sizeof(AggregateInstrumentation);
4999  node->shared_info = shm_toc_allocate(pcxt->toc, size);
5000  /* ensure any unfilled slots will contain zeroes */
5001  memset(node->shared_info, 0, size);
5002  node->shared_info->num_workers = pcxt->nworkers;
5003  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
5004  node->shared_info);
5005 }
5006 
5007 /* ----------------------------------------------------------------
5008  * ExecAggInitializeWorker
5009  *
5010  * Attach worker to DSM space for aggregate statistics.
5011  * ----------------------------------------------------------------
5012  */
5013 void
5015 {
5016  node->shared_info =
5017  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
5018 }
5019 
5020 /* ----------------------------------------------------------------
5021  * ExecAggRetrieveInstrumentation
5022  *
5023  * Transfer aggregate statistics from DSM to private memory.
5024  * ----------------------------------------------------------------
5025  */
5026 void
5028 {
5029  Size size;
5030  SharedAggInfo *si;
5031 
5032  if (node->shared_info == NULL)
5033  return;
5034 
5035  size = offsetof(SharedAggInfo, sinstrument)
5037  si = palloc(size);
5038  memcpy(si, node->shared_info, size);
5039  node->shared_info = si;
5040 }
static void hashagg_reset_spill_state(AggState *aggstate)
Definition: nodeAgg.c:3186
List * aggdistinct
Definition: primnodes.h:321
struct AggStatePerTransData * AggStatePerTrans
Definition: execnodes.h:2126
ExprState ** eqfunctions
Definition: nodeAgg.h:278
struct HashAggSpill * hash_spills
Definition: execnodes.h:2177
AggStatePerGroup * hash_pergroup
Definition: execnodes.h:2197
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:685
#define NIL
Definition: pg_list.h:65
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:977
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition: nodeAgg.c:575
struct AggStatePerGroupData * AggStatePerGroup
Definition: execnodes.h:2127
#define ScanTupleHashTable(htable, iter)
Definition: execnodes.h:726
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition: nodeAgg.c:483
int numCols
Definition: plannodes.h:821
static int partitions
Definition: pgbench.c:196
static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartittions)
Definition: nodeAgg.c:1986
List * qual
Definition: plannodes.h:143
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev)
Definition: tuplesort.c:2475
TupleHashTable BuildTupleHashTableExt(PlanState *parent, TupleDesc inputDesc, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, long nbuckets, Size additionalsize, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, bool use_variable_hash_iv)
Definition: execGrouping.c:154
Datum aggregate_dummy(PG_FUNCTION_ARGS)
Definition: nodeAgg.c:4949
bool aggvariadic
Definition: primnodes.h:324
int bms_first_member(Bitmapset *a)
Definition: bitmapset.c:996
AggStatePerPhase phases
Definition: execnodes.h:2164
double hashentrysize
Definition: execnodes.h:2189
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2021
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
Definition: nodeAgg.c:2922
#define AllocSetContextCreate
Definition: memutils.h:170
AttrNumber * hashGrpColIdxInput
Definition: nodeAgg.h:311
Datum * ecxt_aggvalues
Definition: execnodes.h:245
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition: nodeAgg.c:1912
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
Definition: execGrouping.c:359
uint64 hash_ngroups_limit
Definition: execnodes.h:2186
#define HASHAGG_MAX_PARTITIONS
Definition: nodeAgg.c:289
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:305
Index varlevelsup
Definition: primnodes.h:191
TargetEntry * get_sortgroupclause_tle(SortGroupClause *sgClause, List *targetList)
Definition: tlist.c:389
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:1228
static void hash_agg_check_limits(AggState *aggstate)
Definition: nodeAgg.c:1851
static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory)
Definition: nodeAgg.c:1961
AttrNumber * grpColIdx
Definition: plannodes.h:822
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:972
uint64 transitionSpace
Definition: plannodes.h:826
Instrumentation * instrument
Definition: execnodes.h:942
static void agg_fill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2532
Bitmapset * colnos_needed
Definition: execnodes.h:2159
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:504
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
List * lcons_int(int datum, List *list)
Definition: list.c:471
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:1546
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3032
int numaggs
Definition: execnodes.h:2135
int nfreetapes
Definition: nodeAgg.c:331
Oid GetUserId(void)
Definition: miscinit.c:476
bool agg_done
Definition: execnodes.h:2153
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
Oid * grpCollations
Definition: plannodes.h:824
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
#define TTS_EMPTY(slot)
Definition: tuptable.h:97
TupleTableSlot * sort_slot
Definition: execnodes.h:2167
List * all_grouped_cols
Definition: execnodes.h:2158
Tuplesortstate * sort_out
Definition: execnodes.h:2166
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1167
ScanState ss
Definition: execnodes.h:2133
FmgrInfo equalfnOne
Definition: nodeAgg.h:110
ExprContext * ps_ExprContext
Definition: execnodes.h:971
MinimalTuple firstTuple
Definition: execnodes.h:679
shm_toc_estimator estimator
Definition: parallel.h:42
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition: nodeAgg.c:3144
ExprState * evaltrans
Definition: nodeAgg.h:283
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
int64 input_tuples
Definition: nodeAgg.c:370
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
bool datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen)
Definition: datum.c:222
int plan_node_id
Definition: plannodes.h:141
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct HashTapeInfo HashTapeInfo
Oid inputcollid
Definition: primnodes.h:315
int current_phase
Definition: execnodes.h:2141
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition: nodeAgg.c:3110
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition: tuptable.h:341
Definition: nodes.h:528
AggSplit aggsplit
Definition: execnodes.h:2138
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition: nodeAgg.c:2150
bool * nullsFirst
Definition: plannodes.h:774
int errcode(int sqlerrcode)
Definition: elog.c:610
List * args
Definition: primnodes.h:319
#define MemSet(start, val, len)
Definition: c.h:949
AttrNumber varattno
Definition: primnodes.h:186
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
fmNodePtr context
Definition: fmgr.h:88
Datum * tts_values
Definition: tuptable.h:126
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1322
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition: parse_agg.c:2013
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1064
bool all_cols_needed
Definition: execnodes.h:2161
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2037
AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2109
TupleTableSlot * hash_spill_rslot
Definition: execnodes.h:2179
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:724
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, Oid aggtransfn, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, List *transnos)
Definition: nodeAgg.c:4510
AggStatePerTrans pertrans
Definition: execnodes.h:2143
EState * state
Definition: execnodes.h:934
int projected_set
Definition: execnodes.h:2154
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1152
bool aggstar
Definition: primnodes.h:323
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
uint32 hash_bytes_uint32(uint32 k)
Definition: hashfn.c:610
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:370
HeapTuple grp_firstTuple
Definition: execnodes.h:2171
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
Definition: primnodes.h:181
Aggref * aggref
Definition: nodeAgg.h:187
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition: nodeAgg.c:1380
static void advance_aggregates(AggState *aggstate)
Definition: nodeAgg.c:846
int current_set
Definition: execnodes.h:2156
uint32 mask
Definition: nodeAgg.c:350
#define OidIsValid(objectId)
Definition: c.h:651
#define DO_AGGSPLIT_COMBINE(as)
Definition: nodes.h:790
FunctionCallInfo transfn_fcinfo
Definition: nodeAgg.h:162
TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 *hash)
Definition: execGrouping.c:304
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4989
struct HashAggBatch HashAggBatch
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:655
Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
int numtrans
Definition: execnodes.h:2136
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1439
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition: nodeAgg.c:1877
TupleDesc sortdesc
Definition: nodeAgg.h:138
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, double input_groups, double hashentrysize)
Definition: nodeAgg.c:2942
Oid * sortOperators
Definition: plannodes.h:772
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:951
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
Definition: execGrouping.c:96
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
void ResetTupleHashTable(TupleHashTable hashtable)
Definition: execGrouping.c:283
ExprContext * tmpcontext
Definition: execnodes.h:2146
FmgrInfo transfn
Definition: nodeAgg.h:81
#define HASHAGG_PARTITION_FACTOR
Definition: nodeAgg.c:287
static void build_hash_table(AggState *aggstate, int setno, long nbuckets)
Definition: nodeAgg.c:1506
int max_colno_needed
Definition: execnodes.h:2160
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition: nodeAgg.c:1272
bool hash_spill_mode
Definition: execnodes.h:2183
#define FUNC_MAX_ARGS
static void hashagg_tapeinfo_init(AggState *aggstate)
Definition: nodeAgg.c:2878
List * hash_batches
Definition: execnodes.h:2181
Aggref * aggref
Definition: nodeAgg.h:44
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:878
#define linitial_int(l)
Definition: pg_list.h:195
Bitmapset ** grouped_cols
Definition: nodeAgg.h:277
PlanState ps
Definition: execnodes.h:1319
LogicalTapeSet * tapeset
Definition: nodeAgg.c:368
int maxsets
Definition: execnodes.h:2163
#define true
Definition: c.h:328
static bool agg_refill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2586
static bool find_cols_walker(Node *node, FindColsContext *context)
Definition: nodeAgg.c:1423
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
Definition: nodeAgg.c:1695
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
void initHyperLogLog(hyperLogLogState *cState, uint8 bwidth)
Definition: hyperloglog.c:66
#define DO_AGGSPLIT_SERIALIZE(as)
Definition: nodes.h:792
#define HASHAGG_MIN_PARTITIONS
Definition: nodeAgg.c:288
void pfree(void *pointer)
Definition: mcxt.c:1057
MemoryContext es_query_cxt
Definition: execnodes.h:552
AggStrategy aggstrategy
Definition: plannodes.h:819
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition: nodeAgg.c:3238
#define linitial(l)
Definition: pg_list.h:194
bool table_filled
Definition: execnodes.h:2173
AggStrategy aggstrategy
Definition: execnodes.h:2137
#define HASHAGG_HLL_BIT_WIDTH
Definition: nodeAgg.c:306
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:775
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2733
static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
Definition: nodeAgg.c:1406
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:61
#define lfirst_int(lc)
Definition: pg_list.h:190
static void * list_nth(const List *list, int n)
Definition: pg_list.h:286
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1520
MemoryContext hash_metacxt
Definition: execnodes.h:2175
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition: nodeAgg.c:2186
#define AGG_CONTEXT_AGGREGATE
Definition: fmgr.h:736
struct TupleHashEntryData TupleHashEntryData
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static void find_hash_columns(AggState *aggstate)
Definition: nodeAgg.c:1566
ExprState * equalfnMulti
Definition: nodeAgg.h:111
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
Tuplesortstate * sort_in
Definition: execnodes.h:2165
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1026
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2389
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup)
Definition: nodeAgg.c:1317
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4882
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, int ndest)
Definition: nodeAgg.c:2899
Aggref * aggref
Definition: execnodes.h:749
#define list_nth_node(type, list, n)
Definition: pg_list.h:314
Tuplesortstate ** sortstates
Definition: nodeAgg.h:154
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
Bitmapset * aggParams
Definition: plannodes.h:827
static int initValue(long lng_val)
Definition: informix.c:677
MemoryContext tablecxt
Definition: execnodes.h:701
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:540
bool * tts_isnull
Definition: tuptable.h:128
int npartitions
Definition: nodeAgg.c:347
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
MinimalTupleData * MinimalTuple
Definition: htup.h:27
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:970
List * aggorder
Definition: primnodes.h:320
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4968
int errcode_for_file_access(void)
Definition: elog.c:633
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:135
AttrNumber resno
Definition: primnodes.h:1411
#define DatumGetBool(X)
Definition: postgres.h:393
int ParallelWorkerNumber
Definition: parallel.c:112
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
Definition: nodeAgg.c:2973
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
Index agglevelsup
Definition: primnodes.h:327
int used_bits
Definition: nodeAgg.c:367
struct AggregateInstrumentation AggregateInstrumentation
Bitmapset * unaggregated
Definition: nodeAgg.c:379
#define TupIsNull(slot)
Definition: tuptable.h:292
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:374
List * aggdirectargs
Definition: primnodes.h:318
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition: nodeAgg.c:4404
AggStatePerAgg curperagg
Definition: execnodes.h:2149
AttrNumber * sortColIdx
Definition: nodeAgg.h:100
struct AggStatePerGroupData AggStatePerGroupData
struct HashTapeInfo * hash_tapeinfo
Definition: execnodes.h:2176
AggStatePerHash perhash
Definition: execnodes.h:2196
bool outeropsset
Definition: execnodes.h:1013
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition: nodeAgg.c:693
AggStrategy aggstrategy
Definition: nodeAgg.h:274
ExprState * evaltrans_cache[2][2]
Definition: nodeAgg.h:291
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1034
#define EXEC_FLAG_REWIND
Definition: executor.h:57
hyperLogLogState * hll_card
Definition: nodeAgg.c:352
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2751
void build_aggregate_combinefn_expr(Oid agg_state_type, Oid agg_input_collation, Oid combinefn_oid, Expr **combinefnexpr)
Definition: parse_agg.c:1961
Datum value
Definition: postgres.h:378
Bitmapset * grouped_cols
Definition: execnodes.h:2157
#define IsParallelWorker()
Definition: parallel.h:61
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:131
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1141
void ExecAggRetrieveInstrumentation(AggState *node)
Definition: nodeAgg.c:5027
int hash_batches_used
Definition: execnodes.h:2194
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4856
List * lappend_int(List *list, int datum)
Definition: list.c:339
Bitmapset * chgParam
Definition: execnodes.h:964
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:191
int ntapes
Definition: nodeAgg.c:329
bool IsBinaryCoercible(Oid srctype, Oid targettype)
int my_log2(long num)
Definition: dynahash.c:1730
double input_card
Definition: nodeAgg.c:371
#define outerPlan(node)
Definition: plannodes.h:172
List * lappend(List *list, void *datum)
Definition: list.c:321
Bitmapset * aggregated
Definition: nodeAgg.c:378
TupleHashIterator hashiter
Definition: nodeAgg.h:304
int numCols
Definition: plannodes.h:770
Index varno
Definition: primnodes.h:184
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:606
int num_hashes
Definition: execnodes.h:2174
Plan plan
Definition: plannodes.h:818
AttrNumber * hashGrpColIdxHash
Definition: nodeAgg.h:312
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
bool input_done
Definition: execnodes.h:2152
#define SizeofMinimalTupleHeader
Definition: htup_details.h:649
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
ExprContext * curaggcontext
Definition: execnodes.h:2148
ExprContext * hashcontext
Definition: execnodes.h:2144
bool * ecxt_aggnulls
Definition: execnodes.h:247
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:397
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define TextDatumGetCString(d)
Definition: builtins.h:87
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos)
Definition: nodeAgg.c:4432
List * es_tupleTable
Definition: execnodes.h:554
#define HASHAGG_READ_BUFFER_SIZE
Definition: nodeAgg.c:297
AggStatePerPhase phase
Definition: execnodes.h:2139
void * palloc0(Size size)
Definition: mcxt.c:981
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:938
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164