PostgreSQL Source Code  git master
nodeAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  * Routines to handle aggregate nodes.
5  *
6  * ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  * transvalue = initcond
9  * foreach input_tuple do
10  * transvalue = transfunc(transvalue, input_value(s))
11  * result = finalfunc(transvalue, direct_argument(s))
12  *
13  * If a finalfunc is not supplied then the result is just the ending
14  * value of transvalue.
15  *
16  * Other behaviors can be selected by the "aggsplit" mode, which exists
17  * to support partial aggregation. It is possible to:
18  * * Skip running the finalfunc, so that the output is always the
19  * final transvalue state.
20  * * Substitute the combinefunc for the transfunc, so that transvalue
21  * states (propagated up from a child partial-aggregation step) are merged
22  * rather than processing raw input rows. (The statements below about
23  * the transfunc apply equally to the combinefunc, when it's selected.)
24  * * Apply the serializefunc to the output values (this only makes sense
25  * when skipping the finalfunc, since the serializefunc works on the
26  * transvalue data type).
27  * * Apply the deserializefunc to the input values (this only makes sense
28  * when using the combinefunc, for similar reasons).
29  * It is the planner's responsibility to connect up Agg nodes using these
30  * alternate behaviors in a way that makes sense, with partial aggregation
31  * results being fed to nodes that expect them.
32  *
33  * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  * input tuples and eliminate duplicates (if required) before performing
35  * the above-depicted process. (However, we don't do that for ordered-set
36  * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  * so far as this module is concerned.) Note that partial aggregation
38  * is not supported in these cases, since we couldn't ensure global
39  * ordering or distinctness of the inputs.
40  *
41  * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  * then the first non-NULL input_value is assigned directly to transvalue,
43  * and transfunc isn't applied until the second non-NULL input_value.
44  * The agg's first input type and transtype must be the same in this case!
45  *
46  * If transfunc is marked "strict" then NULL input_values are skipped,
47  * keeping the previous transvalue. If transfunc is not strict then it
48  * is called for every input tuple and must deal with NULL initcond
49  * or NULL input_values for itself.
50  *
51  * If finalfunc is marked "strict" then it is not called when the
52  * ending transvalue is NULL, instead a NULL result is created
53  * automatically (this is just the usual handling of strict functions,
54  * of course). A non-strict finalfunc can make its own choice of
55  * what to return for a NULL ending transvalue.
56  *
57  * Ordered-set aggregates are treated specially in one other way: we
58  * evaluate any "direct" arguments and pass them to the finalfunc along
59  * with the transition value.
60  *
61  * A finalfunc can have additional arguments beyond the transvalue and
62  * any "direct" arguments, corresponding to the input arguments of the
63  * aggregate. These are always just passed as NULL. Such arguments may be
64  * needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  * We compute aggregate input expressions and run the transition functions
67  * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68  * once per input tuple, so when the transvalue datatype is
69  * pass-by-reference, we have to be careful to copy it into a longer-lived
70  * memory context, and free the prior value to avoid memory leakage. We
71  * store transvalues in another set of econtexts, aggstate->aggcontexts
72  * (one per grouping set, see below), which are also used for the hashtable
73  * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74  * reset, at group boundaries so that aggregate transition functions can
75  * register shutdown callbacks via AggRegisterCallback.
76  *
77  * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  * run finalize functions and compute the output tuple; this context can be
79  * reset once per output tuple.
80  *
81  * The executor's AggState node is passed as the fmgr "context" value in
82  * all transfunc and finalfunc calls. It is not recommended that the
83  * transition functions look at the AggState node directly, but they can
84  * use AggCheckCallContext() to verify that they are being called by
85  * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86  * transition function might want to know this is so that it can avoid
87  * palloc'ing a fixed-size pass-by-ref transition value on every call:
88  * it can instead just scribble on and return its left input. Ordinarily
89  * it is completely forbidden for functions to modify pass-by-ref inputs,
90  * but in the aggregate case we know the left input is either the initial
91  * transition value or a previous function result, and in either case its
92  * value need not be preserved. See int8inc() for an example. Notice that
93  * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  * the previous transition value pointer is returned. It is also possible
95  * to avoid repeated data copying when the transition value is an expanded
96  * object: to do that, the transition function must take care to return
97  * an expanded object that is in a child context of the memory context
98  * returned by AggCheckCallContext(). Also, some transition functions want
99  * to store working state in addition to the nominal transition value; they
100  * can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103  * AggState is available as context in earlier releases (back to 8.1),
104  * but direct examination of the node is needed to use it before 9.0.
105  *
106  * As of 9.4, aggregate transition functions can also use AggGetAggref()
107  * to get hold of the Aggref expression node for their aggregate call.
108  * This is mainly intended for ordered-set aggregates, which are not
109  * supported as window functions. (A regular aggregate function would
110  * need some fallback logic to use this, since there's no Aggref node
111  * for a window function.)
112  *
113  * Grouping sets:
114  *
115  * A list of grouping sets which is structurally equivalent to a ROLLUP
116  * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  * ordered data. We do this by keeping a separate set of transition values
118  * for each grouping set being concurrently processed; for each input tuple
119  * we update them all, and on group boundaries we reset those states
120  * (starting at the front of the list) whose grouping values have changed
121  * (the list of grouping sets is ordered from most specific to least
122  * specific).
123  *
124  * Where more complex grouping sets are used, we break them down into
125  * "phases", where each phase has a different sort order (except phase 0
126  * which is reserved for hashing). During each phase but the last, the
127  * input tuples are additionally stored in a tuplesort which is keyed to the
128  * next phase's sort order; during each phase but the first, the input
129  * tuples are drawn from the previously sorted data. (The sorting of the
130  * data for the first phase is handled by the planner, as it might be
131  * satisfied by underlying nodes.)
132  *
133  * Hashing can be mixed with sorted grouping. To do this, we have an
134  * AGG_MIXED strategy that populates the hashtables during the first sorted
135  * phase, and switches to reading them out after completing all sort phases.
136  * We can also support AGG_HASHED with multiple hash tables and no sorting
137  * at all.
138  *
139  * From the perspective of aggregate transition and final functions, the
140  * only issue regarding grouping sets is this: a single call site (flinfo)
141  * of an aggregate function may be used for updating several different
142  * transition values in turn. So the function must not cache in the flinfo
143  * anything which logically belongs as part of the transition value (most
144  * importantly, the memory context in which the transition value exists).
145  * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  * sensitive to the grouping set for which the aggregate function is
147  * currently being called.
148  *
149  * Plan structure:
150  *
151  * What we get from the planner is actually one "real" Agg node which is
152  * part of the plan tree proper, but which optionally has an additional list
153  * of Agg nodes hung off the side via the "chain" field. This is because an
154  * Agg node happens to be a convenient representation of all the data we
155  * need for grouping sets.
156  *
157  * For many purposes, we treat the "real" node as if it were just the first
158  * node in the chain. The chain must be ordered such that hashed entries
159  * come before sorted/plain entries; the real node is marked AGG_MIXED if
160  * there are both types present (in which case the real node describes one
161  * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163  * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  * chained nodes.
166  *
167  * We collect all hashed nodes into a single "phase", numbered 0, and create
168  * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  * Phase 0 is allocated even if there are no hashes, but remains unused in
170  * that case.
171  *
172  * AGG_HASHED nodes actually refer to only a single grouping set each,
173  * because for each hashed grouping we need a separate grpColIdx and
174  * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175  * grouping sets that share a sort order. Each AGG_SORTED node other than
176  * the first one has an associated Sort node which describes the sort order
177  * to be used; the first sorted node takes its input from the outer subtree,
178  * which the planner has already arranged to provide ordered data.
179  *
180  * Memory and ExprContext usage:
181  *
182  * Because we're accumulating aggregate values across input rows, we need to
183  * use more memory contexts than just simple input/output tuple contexts.
184  * In fact, for a rollup, we need a separate context for each grouping set
185  * so that we can reset the inner (finer-grained) aggregates on their group
186  * boundaries while continuing to accumulate values for outer
187  * (coarser-grained) groupings. On top of this, we might be simultaneously
188  * populating hashtables; however, we only need one context for all the
189  * hashtables.
190  *
191  * So we create an array, aggcontexts, with an ExprContext for each grouping
192  * set in the largest rollup that we're going to process, and use the
193  * per-tuple memory context of those ExprContexts to store the aggregate
194  * transition values. hashcontext is the single context created to support
195  * all hash tables.
196  *
197  * Spilling To Disk
198  *
199  * When performing hash aggregation, if the hash table memory exceeds the
200  * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  * mode, we advance the transition states only for groups already in the
202  * hash table. For tuples that would need to create a new hash table
203  * entries (and initialize new transition states), we instead spill them to
204  * disk to be processed later. The tuples are spilled in a partitioned
205  * manner, so that subsequent batches are smaller and less likely to exceed
206  * hash_mem (if a batch does exceed hash_mem, it must be spilled
207  * recursively).
208  *
209  * Spilled data is written to logical tapes. These provide better control
210  * over memory usage, disk space, and the number of files than if we were
211  * to use a BufFile for each spill.
212  *
213  * Note that it's possible for transition states to start small but then
214  * grow very large; for instance in the case of ARRAY_AGG. In such cases,
215  * it's still possible to significantly exceed hash_mem. We try to avoid
216  * this situation by estimating what will fit in the available memory, and
217  * imposing a limit on the number of groups separately from the amount of
218  * memory consumed.
219  *
220  * Transition / Combine function invocation:
221  *
222  * For performance reasons transition functions, including combine
223  * functions, aren't invoked one-by-one from nodeAgg.c after computing
224  * arguments using the expression evaluation engine. Instead
225  * ExecBuildAggTrans() builds one large expression that does both argument
226  * evaluation and transition function invocation. That avoids performance
227  * issues due to repeated uses of expression evaluation, complications due
228  * to filter expressions having to be evaluated early, and allows to JIT
229  * the entire expression into one native function.
230  *
231  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
232  * Portions Copyright (c) 1994, Regents of the University of California
233  *
234  * IDENTIFICATION
235  * src/backend/executor/nodeAgg.c
236  *
237  *-------------------------------------------------------------------------
238  */
239 
240 #include "postgres.h"
241 
242 #include "access/htup_details.h"
243 #include "access/parallel.h"
244 #include "catalog/objectaccess.h"
245 #include "catalog/pg_aggregate.h"
246 #include "catalog/pg_proc.h"
247 #include "catalog/pg_type.h"
248 #include "common/hashfn.h"
249 #include "executor/execExpr.h"
250 #include "executor/executor.h"
251 #include "executor/nodeAgg.h"
252 #include "lib/hyperloglog.h"
253 #include "miscadmin.h"
254 #include "nodes/makefuncs.h"
255 #include "nodes/nodeFuncs.h"
256 #include "optimizer/optimizer.h"
257 #include "parser/parse_agg.h"
258 #include "parser/parse_coerce.h"
259 #include "utils/acl.h"
260 #include "utils/builtins.h"
261 #include "utils/datum.h"
262 #include "utils/dynahash.h"
263 #include "utils/expandeddatum.h"
264 #include "utils/logtape.h"
265 #include "utils/lsyscache.h"
266 #include "utils/memutils.h"
267 #include "utils/syscache.h"
268 #include "utils/tuplesort.h"
269 
270 /*
271  * Control how many partitions are created when spilling HashAgg to
272  * disk.
273  *
274  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
275  * partitions needed such that each partition will fit in memory. The factor
276  * is set higher than one because there's not a high cost to having a few too
277  * many partitions, and it makes it less likely that a partition will need to
278  * be spilled recursively. Another benefit of having more, smaller partitions
279  * is that small hash tables may perform better than large ones due to memory
280  * caching effects.
281  *
282  * We also specify a min and max number of partitions per spill. Too few might
283  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
284  * many will result in lots of memory wasted buffering the spill files (which
285  * could instead be spent on a larger hash table).
286  */
287 #define HASHAGG_PARTITION_FACTOR 1.50
288 #define HASHAGG_MIN_PARTITIONS 4
289 #define HASHAGG_MAX_PARTITIONS 1024
290 
291 /*
292  * For reading from tapes, the buffer size must be a multiple of
293  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
294  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
295  * tape always uses a buffer of size BLCKSZ.
296  */
297 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
298 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
299 
300 /*
301  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303  * worst-case error of around 18%. That's effective enough to choose a
304  * reasonable number of partitions when recursing.
305  */
306 #define HASHAGG_HLL_BIT_WIDTH 5
307 
308 /*
309  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
310  * improved?
311  */
312 #define CHUNKHDRSZ 16
313 
314 /*
315  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
316  * number of tapes needed at the start of the algorithm (because it can
317  * recurse), so one tape set is allocated and extended as needed for new
318  * tapes. When a particular tape is already read, rewind it for write mode and
319  * put it in the free list.
320  *
321  * Tapes' buffers can take up substantial memory when many tapes are open at
322  * once. We only need one tape open at a time in read mode (using a buffer
323  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
324  * requiring a buffer of size BLCKSZ) for each partition.
325  */
326 typedef struct HashTapeInfo
327 {
329  int ntapes;
330  int *freetapes;
333 } HashTapeInfo;
334 
335 /*
336  * Represents partitioned spill data for a single hashtable. Contains the
337  * necessary information to route tuples to the correct partition, and to
338  * transform the spilled data into new batches.
339  *
340  * The high bits are used for partition selection (when recursing, we ignore
341  * the bits that have already been used for partition selection at an earlier
342  * level).
343  */
344 typedef struct HashAggSpill
345 {
346  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
347  int npartitions; /* number of partitions */
348  int *partitions; /* spill partition tape numbers */
349  int64 *ntuples; /* number of tuples in each partition */
350  uint32 mask; /* mask to find partition from hash value */
351  int shift; /* after masking, shift by this amount */
352  hyperLogLogState *hll_card; /* cardinality estimate for contents */
353 } HashAggSpill;
354 
355 /*
356  * Represents work to be done for one pass of hash aggregation (with only one
357  * grouping set).
358  *
359  * Also tracks the bits of the hash already used for partition selection by
360  * earlier iterations, so that this batch can use new bits. If all bits have
361  * already been used, no partitioning will be done (any spilled data will go
362  * to a single output tape).
363  */
364 typedef struct HashAggBatch
365 {
366  int setno; /* grouping set */
367  int used_bits; /* number of bits of hash already used */
368  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
369  int input_tapenum; /* input partition tape */
370  int64 input_tuples; /* number of tuples in this batch */
371  double input_card; /* estimated group cardinality */
372 } HashAggBatch;
373 
374 /* used to find referenced colnos */
375 typedef struct FindColsContext
376 {
377  bool is_aggref; /* is under an aggref */
378  Bitmapset *aggregated; /* column references under an aggref */
379  Bitmapset *unaggregated; /* other column references */
381 
382 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
383 static void initialize_phase(AggState *aggstate, int newphase);
384 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
385 static void initialize_aggregates(AggState *aggstate,
386  AggStatePerGroup *pergroups,
387  int numReset);
388 static void advance_transition_function(AggState *aggstate,
389  AggStatePerTrans pertrans,
390  AggStatePerGroup pergroupstate);
391 static void advance_aggregates(AggState *aggstate);
392 static void process_ordered_aggregate_single(AggState *aggstate,
393  AggStatePerTrans pertrans,
394  AggStatePerGroup pergroupstate);
395 static void process_ordered_aggregate_multi(AggState *aggstate,
396  AggStatePerTrans pertrans,
397  AggStatePerGroup pergroupstate);
398 static void finalize_aggregate(AggState *aggstate,
399  AggStatePerAgg peragg,
400  AggStatePerGroup pergroupstate,
401  Datum *resultVal, bool *resultIsNull);
402 static void finalize_partialaggregate(AggState *aggstate,
403  AggStatePerAgg peragg,
404  AggStatePerGroup pergroupstate,
405  Datum *resultVal, bool *resultIsNull);
406 static inline void prepare_hash_slot(AggStatePerHash perhash,
407  TupleTableSlot *inputslot,
408  TupleTableSlot *hashslot);
409 static void prepare_projection_slot(AggState *aggstate,
410  TupleTableSlot *slot,
411  int currentSet);
412 static void finalize_aggregates(AggState *aggstate,
413  AggStatePerAgg peragg,
414  AggStatePerGroup pergroup);
415 static TupleTableSlot *project_aggregates(AggState *aggstate);
416 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
417  Bitmapset **unaggregated);
418 static bool find_cols_walker(Node *node, FindColsContext *context);
419 static void build_hash_tables(AggState *aggstate);
420 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
421 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
422  bool nullcheck);
423 static long hash_choose_num_buckets(double hashentrysize,
424  long estimated_nbuckets,
425  Size memory);
426 static int hash_choose_num_partitions(double input_groups,
427  double hashentrysize,
428  int used_bits,
429  int *log2_npartittions);
430 static void initialize_hash_entry(AggState *aggstate,
431  TupleHashTable hashtable,
432  TupleHashEntry entry);
433 static void lookup_hash_entries(AggState *aggstate);
434 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
435 static void agg_fill_hash_table(AggState *aggstate);
436 static bool agg_refill_hash_table(AggState *aggstate);
439 static void hash_agg_check_limits(AggState *aggstate);
440 static void hash_agg_enter_spill_mode(AggState *aggstate);
441 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
442  int npartitions);
443 static void hashagg_finish_initial_spills(AggState *aggstate);
444 static void hashagg_reset_spill_state(AggState *aggstate);
446  int input_tapenum, int setno,
447  int64 input_tuples, double input_card,
448  int used_bits);
449 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
450 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
451  int used_bits, double input_groups,
452  double hashentrysize);
453 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
454  TupleTableSlot *slot, uint32 hash);
455 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
456  int setno);
457 static void hashagg_tapeinfo_init(AggState *aggstate);
458 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
459  int ndest);
460 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
461 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
462 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
463  AggState *aggstate, EState *estate,
464  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
465  Oid aggserialfn, Oid aggdeserialfn,
466  Datum initValue, bool initValueIsNull,
467  Oid *inputTypes, int numArguments);
468 
469 
470 /*
471  * Select the current grouping set; affects current_set and
472  * curaggcontext.
473  */
474 static void
475 select_current_set(AggState *aggstate, int setno, bool is_hash)
476 {
477  /*
478  * When changing this, also adapt ExecAggPlainTransByVal() and
479  * ExecAggPlainTransByRef().
480  */
481  if (is_hash)
482  aggstate->curaggcontext = aggstate->hashcontext;
483  else
484  aggstate->curaggcontext = aggstate->aggcontexts[setno];
485 
486  aggstate->current_set = setno;
487 }
488 
489 /*
490  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
491  * current_phase + 1. Juggle the tuplesorts accordingly.
492  *
493  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
494  * case, so when entering phase 0, all we need to do is drop open sorts.
495  */
496 static void
497 initialize_phase(AggState *aggstate, int newphase)
498 {
499  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
500 
501  /*
502  * Whatever the previous state, we're now done with whatever input
503  * tuplesort was in use.
504  */
505  if (aggstate->sort_in)
506  {
507  tuplesort_end(aggstate->sort_in);
508  aggstate->sort_in = NULL;
509  }
510 
511  if (newphase <= 1)
512  {
513  /*
514  * Discard any existing output tuplesort.
515  */
516  if (aggstate->sort_out)
517  {
518  tuplesort_end(aggstate->sort_out);
519  aggstate->sort_out = NULL;
520  }
521  }
522  else
523  {
524  /*
525  * The old output tuplesort becomes the new input one, and this is the
526  * right time to actually sort it.
527  */
528  aggstate->sort_in = aggstate->sort_out;
529  aggstate->sort_out = NULL;
530  Assert(aggstate->sort_in);
531  tuplesort_performsort(aggstate->sort_in);
532  }
533 
534  /*
535  * If this isn't the last phase, we need to sort appropriately for the
536  * next phase in sequence.
537  */
538  if (newphase > 0 && newphase < aggstate->numphases - 1)
539  {
540  Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
541  PlanState *outerNode = outerPlanState(aggstate);
542  TupleDesc tupDesc = ExecGetResultType(outerNode);
543 
544  aggstate->sort_out = tuplesort_begin_heap(tupDesc,
545  sortnode->numCols,
546  sortnode->sortColIdx,
547  sortnode->sortOperators,
548  sortnode->collations,
549  sortnode->nullsFirst,
550  work_mem,
551  NULL, false);
552  }
553 
554  aggstate->current_phase = newphase;
555  aggstate->phase = &aggstate->phases[newphase];
556 }
557 
558 /*
559  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
560  * populated by the previous phase. Copy it to the sorter for the next phase
561  * if any.
562  *
563  * Callers cannot rely on memory for tuple in returned slot remaining valid
564  * past any subsequently fetched tuple.
565  */
566 static TupleTableSlot *
568 {
569  TupleTableSlot *slot;
570 
571  if (aggstate->sort_in)
572  {
573  /* make sure we check for interrupts in either path through here */
575  if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
576  aggstate->sort_slot, NULL))
577  return NULL;
578  slot = aggstate->sort_slot;
579  }
580  else
581  slot = ExecProcNode(outerPlanState(aggstate));
582 
583  if (!TupIsNull(slot) && aggstate->sort_out)
584  tuplesort_puttupleslot(aggstate->sort_out, slot);
585 
586  return slot;
587 }
588 
589 /*
590  * (Re)Initialize an individual aggregate.
591  *
592  * This function handles only one grouping set, already set in
593  * aggstate->current_set.
594  *
595  * When called, CurrentMemoryContext should be the per-query context.
596  */
597 static void
599  AggStatePerGroup pergroupstate)
600 {
601  /*
602  * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
603  */
604  if (pertrans->numSortCols > 0)
605  {
606  /*
607  * In case of rescan, maybe there could be an uncompleted sort
608  * operation? Clean it up if so.
609  */
610  if (pertrans->sortstates[aggstate->current_set])
611  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
612 
613 
614  /*
615  * We use a plain Datum sorter when there's a single input column;
616  * otherwise sort the full tuple. (See comments for
617  * process_ordered_aggregate_single.)
618  */
619  if (pertrans->numInputs == 1)
620  {
621  Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
622 
623  pertrans->sortstates[aggstate->current_set] =
624  tuplesort_begin_datum(attr->atttypid,
625  pertrans->sortOperators[0],
626  pertrans->sortCollations[0],
627  pertrans->sortNullsFirst[0],
628  work_mem, NULL, false);
629  }
630  else
631  pertrans->sortstates[aggstate->current_set] =
632  tuplesort_begin_heap(pertrans->sortdesc,
633  pertrans->numSortCols,
634  pertrans->sortColIdx,
635  pertrans->sortOperators,
636  pertrans->sortCollations,
637  pertrans->sortNullsFirst,
638  work_mem, NULL, false);
639  }
640 
641  /*
642  * (Re)set transValue to the initial value.
643  *
644  * Note that when the initial value is pass-by-ref, we must copy it (into
645  * the aggcontext) since we will pfree the transValue later.
646  */
647  if (pertrans->initValueIsNull)
648  pergroupstate->transValue = pertrans->initValue;
649  else
650  {
651  MemoryContext oldContext;
652 
654  pergroupstate->transValue = datumCopy(pertrans->initValue,
655  pertrans->transtypeByVal,
656  pertrans->transtypeLen);
657  MemoryContextSwitchTo(oldContext);
658  }
659  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
660 
661  /*
662  * If the initial value for the transition state doesn't exist in the
663  * pg_aggregate table then we will let the first non-NULL value returned
664  * from the outer procNode become the initial value. (This is useful for
665  * aggregates like max() and min().) The noTransValue flag signals that we
666  * still need to do this.
667  */
668  pergroupstate->noTransValue = pertrans->initValueIsNull;
669 }
670 
671 /*
672  * Initialize all aggregate transition states for a new group of input values.
673  *
674  * If there are multiple grouping sets, we initialize only the first numReset
675  * of them (the grouping sets are ordered so that the most specific one, which
676  * is reset most often, is first). As a convenience, if numReset is 0, we
677  * reinitialize all sets.
678  *
679  * NB: This cannot be used for hash aggregates, as for those the grouping set
680  * number has to be specified from further up.
681  *
682  * When called, CurrentMemoryContext should be the per-query context.
683  */
684 static void
686  AggStatePerGroup *pergroups,
687  int numReset)
688 {
689  int transno;
690  int numGroupingSets = Max(aggstate->phase->numsets, 1);
691  int setno = 0;
692  int numTrans = aggstate->numtrans;
693  AggStatePerTrans transstates = aggstate->pertrans;
694 
695  if (numReset == 0)
696  numReset = numGroupingSets;
697 
698  for (setno = 0; setno < numReset; setno++)
699  {
700  AggStatePerGroup pergroup = pergroups[setno];
701 
702  select_current_set(aggstate, setno, false);
703 
704  for (transno = 0; transno < numTrans; transno++)
705  {
706  AggStatePerTrans pertrans = &transstates[transno];
707  AggStatePerGroup pergroupstate = &pergroup[transno];
708 
709  initialize_aggregate(aggstate, pertrans, pergroupstate);
710  }
711  }
712 }
713 
714 /*
715  * Given new input value(s), advance the transition function of one aggregate
716  * state within one grouping set only (already set in aggstate->current_set)
717  *
718  * The new values (and null flags) have been preloaded into argument positions
719  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
720  * pass to the transition function. We also expect that the static fields of
721  * the fcinfo are already initialized; that was done by ExecInitAgg().
722  *
723  * It doesn't matter which memory context this is called in.
724  */
725 static void
727  AggStatePerTrans pertrans,
728  AggStatePerGroup pergroupstate)
729 {
730  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
731  MemoryContext oldContext;
732  Datum newVal;
733 
734  if (pertrans->transfn.fn_strict)
735  {
736  /*
737  * For a strict transfn, nothing happens when there's a NULL input; we
738  * just keep the prior transValue.
739  */
740  int numTransInputs = pertrans->numTransInputs;
741  int i;
742 
743  for (i = 1; i <= numTransInputs; i++)
744  {
745  if (fcinfo->args[i].isnull)
746  return;
747  }
748  if (pergroupstate->noTransValue)
749  {
750  /*
751  * transValue has not been initialized. This is the first non-NULL
752  * input value. We use it as the initial value for transValue. (We
753  * already checked that the agg's input type is binary-compatible
754  * with its transtype, so straight copy here is OK.)
755  *
756  * We must copy the datum into aggcontext if it is pass-by-ref. We
757  * do not need to pfree the old transValue, since it's NULL.
758  */
760  pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
761  pertrans->transtypeByVal,
762  pertrans->transtypeLen);
763  pergroupstate->transValueIsNull = false;
764  pergroupstate->noTransValue = false;
765  MemoryContextSwitchTo(oldContext);
766  return;
767  }
768  if (pergroupstate->transValueIsNull)
769  {
770  /*
771  * Don't call a strict function with NULL inputs. Note it is
772  * possible to get here despite the above tests, if the transfn is
773  * strict *and* returned a NULL on a prior cycle. If that happens
774  * we will propagate the NULL all the way to the end.
775  */
776  return;
777  }
778  }
779 
780  /* We run the transition functions in per-input-tuple memory context */
781  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
782 
783  /* set up aggstate->curpertrans for AggGetAggref() */
784  aggstate->curpertrans = pertrans;
785 
786  /*
787  * OK to call the transition function
788  */
789  fcinfo->args[0].value = pergroupstate->transValue;
790  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
791  fcinfo->isnull = false; /* just in case transfn doesn't set it */
792 
793  newVal = FunctionCallInvoke(fcinfo);
794 
795  aggstate->curpertrans = NULL;
796 
797  /*
798  * If pass-by-ref datatype, must copy the new value into aggcontext and
799  * free the prior transValue. But if transfn returned a pointer to its
800  * first input, we don't need to do anything. Also, if transfn returned a
801  * pointer to a R/W expanded object that is already a child of the
802  * aggcontext, assume we can adopt that value without copying it.
803  *
804  * It's safe to compare newVal with pergroup->transValue without regard
805  * for either being NULL, because ExecAggTransReparent() takes care to set
806  * transValue to 0 when NULL. Otherwise we could end up accidentally not
807  * reparenting, when the transValue has the same numerical value as
808  * newValue, despite being NULL. This is a somewhat hot path, making it
809  * undesirable to instead solve this with another branch for the common
810  * case of the transition function returning its (modified) input
811  * argument.
812  */
813  if (!pertrans->transtypeByVal &&
814  DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
815  newVal = ExecAggTransReparent(aggstate, pertrans,
816  newVal, fcinfo->isnull,
817  pergroupstate->transValue,
818  pergroupstate->transValueIsNull);
819 
820  pergroupstate->transValue = newVal;
821  pergroupstate->transValueIsNull = fcinfo->isnull;
822 
823  MemoryContextSwitchTo(oldContext);
824 }
825 
826 /*
827  * Advance each aggregate transition state for one input tuple. The input
828  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
829  * accessible to ExecEvalExpr.
830  *
831  * We have two sets of transition states to handle: one for sorted aggregation
832  * and one for hashed; we do them both here, to avoid multiple evaluation of
833  * the inputs.
834  *
835  * When called, CurrentMemoryContext should be the per-query context.
836  */
837 static void
839 {
840  bool dummynull;
841 
843  aggstate->tmpcontext,
844  &dummynull);
845 }
846 
847 /*
848  * Run the transition function for a DISTINCT or ORDER BY aggregate
849  * with only one input. This is called after we have completed
850  * entering all the input values into the sort object. We complete the
851  * sort, read out the values in sorted order, and run the transition
852  * function on each value (applying DISTINCT if appropriate).
853  *
854  * Note that the strictness of the transition function was checked when
855  * entering the values into the sort, so we don't check it again here;
856  * we just apply standard SQL DISTINCT logic.
857  *
858  * The one-input case is handled separately from the multi-input case
859  * for performance reasons: for single by-value inputs, such as the
860  * common case of count(distinct id), the tuplesort_getdatum code path
861  * is around 300% faster. (The speedup for by-reference types is less
862  * but still noticeable.)
863  *
864  * This function handles only one grouping set (already set in
865  * aggstate->current_set).
866  *
867  * When called, CurrentMemoryContext should be the per-query context.
868  */
869 static void
871  AggStatePerTrans pertrans,
872  AggStatePerGroup pergroupstate)
873 {
874  Datum oldVal = (Datum) 0;
875  bool oldIsNull = true;
876  bool haveOldVal = false;
877  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
878  MemoryContext oldContext;
879  bool isDistinct = (pertrans->numDistinctCols > 0);
880  Datum newAbbrevVal = (Datum) 0;
881  Datum oldAbbrevVal = (Datum) 0;
882  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
883  Datum *newVal;
884  bool *isNull;
885 
886  Assert(pertrans->numDistinctCols < 2);
887 
888  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
889 
890  /* Load the column into argument 1 (arg 0 will be transition value) */
891  newVal = &fcinfo->args[1].value;
892  isNull = &fcinfo->args[1].isnull;
893 
894  /*
895  * Note: if input type is pass-by-ref, the datums returned by the sort are
896  * freshly palloc'd in the per-query context, so we must be careful to
897  * pfree them when they are no longer needed.
898  */
899 
900  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
901  true, newVal, isNull, &newAbbrevVal))
902  {
903  /*
904  * Clear and select the working context for evaluation of the equality
905  * function and transition function.
906  */
907  MemoryContextReset(workcontext);
908  oldContext = MemoryContextSwitchTo(workcontext);
909 
910  /*
911  * If DISTINCT mode, and not distinct from prior, skip it.
912  */
913  if (isDistinct &&
914  haveOldVal &&
915  ((oldIsNull && *isNull) ||
916  (!oldIsNull && !*isNull &&
917  oldAbbrevVal == newAbbrevVal &&
919  pertrans->aggCollation,
920  oldVal, *newVal)))))
921  {
922  /* equal to prior, so forget this one */
923  if (!pertrans->inputtypeByVal && !*isNull)
924  pfree(DatumGetPointer(*newVal));
925  }
926  else
927  {
928  advance_transition_function(aggstate, pertrans, pergroupstate);
929  /* forget the old value, if any */
930  if (!oldIsNull && !pertrans->inputtypeByVal)
931  pfree(DatumGetPointer(oldVal));
932  /* and remember the new one for subsequent equality checks */
933  oldVal = *newVal;
934  oldAbbrevVal = newAbbrevVal;
935  oldIsNull = *isNull;
936  haveOldVal = true;
937  }
938 
939  MemoryContextSwitchTo(oldContext);
940  }
941 
942  if (!oldIsNull && !pertrans->inputtypeByVal)
943  pfree(DatumGetPointer(oldVal));
944 
945  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
946  pertrans->sortstates[aggstate->current_set] = NULL;
947 }
948 
949 /*
950  * Run the transition function for a DISTINCT or ORDER BY aggregate
951  * with more than one input. This is called after we have completed
952  * entering all the input values into the sort object. We complete the
953  * sort, read out the values in sorted order, and run the transition
954  * function on each value (applying DISTINCT if appropriate).
955  *
956  * This function handles only one grouping set (already set in
957  * aggstate->current_set).
958  *
959  * When called, CurrentMemoryContext should be the per-query context.
960  */
961 static void
963  AggStatePerTrans pertrans,
964  AggStatePerGroup pergroupstate)
965 {
966  ExprContext *tmpcontext = aggstate->tmpcontext;
967  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
968  TupleTableSlot *slot1 = pertrans->sortslot;
969  TupleTableSlot *slot2 = pertrans->uniqslot;
970  int numTransInputs = pertrans->numTransInputs;
971  int numDistinctCols = pertrans->numDistinctCols;
972  Datum newAbbrevVal = (Datum) 0;
973  Datum oldAbbrevVal = (Datum) 0;
974  bool haveOldValue = false;
975  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
976  int i;
977 
978  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
979 
980  ExecClearTuple(slot1);
981  if (slot2)
982  ExecClearTuple(slot2);
983 
984  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
985  true, true, slot1, &newAbbrevVal))
986  {
988 
989  tmpcontext->ecxt_outertuple = slot1;
990  tmpcontext->ecxt_innertuple = slot2;
991 
992  if (numDistinctCols == 0 ||
993  !haveOldValue ||
994  newAbbrevVal != oldAbbrevVal ||
995  !ExecQual(pertrans->equalfnMulti, tmpcontext))
996  {
997  /*
998  * Extract the first numTransInputs columns as datums to pass to
999  * the transfn.
1000  */
1001  slot_getsomeattrs(slot1, numTransInputs);
1002 
1003  /* Load values into fcinfo */
1004  /* Start from 1, since the 0th arg will be the transition value */
1005  for (i = 0; i < numTransInputs; i++)
1006  {
1007  fcinfo->args[i + 1].value = slot1->tts_values[i];
1008  fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1009  }
1010 
1011  advance_transition_function(aggstate, pertrans, pergroupstate);
1012 
1013  if (numDistinctCols > 0)
1014  {
1015  /* swap the slot pointers to retain the current tuple */
1016  TupleTableSlot *tmpslot = slot2;
1017 
1018  slot2 = slot1;
1019  slot1 = tmpslot;
1020  /* avoid ExecQual() calls by reusing abbreviated keys */
1021  oldAbbrevVal = newAbbrevVal;
1022  haveOldValue = true;
1023  }
1024  }
1025 
1026  /* Reset context each time */
1027  ResetExprContext(tmpcontext);
1028 
1029  ExecClearTuple(slot1);
1030  }
1031 
1032  if (slot2)
1033  ExecClearTuple(slot2);
1034 
1035  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1036  pertrans->sortstates[aggstate->current_set] = NULL;
1037 
1038  /* restore previous slot, potentially in use for grouping sets */
1039  tmpcontext->ecxt_outertuple = save;
1040 }
1041 
1042 /*
1043  * Compute the final value of one aggregate.
1044  *
1045  * This function handles only one grouping set (already set in
1046  * aggstate->current_set).
1047  *
1048  * The finalfn will be run, and the result delivered, in the
1049  * output-tuple context; caller's CurrentMemoryContext does not matter.
1050  *
1051  * The finalfn uses the state as set in the transno. This also might be
1052  * being used by another aggregate function, so it's important that we do
1053  * nothing destructive here.
1054  */
1055 static void
1057  AggStatePerAgg peragg,
1058  AggStatePerGroup pergroupstate,
1059  Datum *resultVal, bool *resultIsNull)
1060 {
1061  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1062  bool anynull = false;
1063  MemoryContext oldContext;
1064  int i;
1065  ListCell *lc;
1066  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1067 
1069 
1070  /*
1071  * Evaluate any direct arguments. We do this even if there's no finalfn
1072  * (which is unlikely anyway), so that side-effects happen as expected.
1073  * The direct arguments go into arg positions 1 and up, leaving position 0
1074  * for the transition state value.
1075  */
1076  i = 1;
1077  foreach(lc, peragg->aggdirectargs)
1078  {
1079  ExprState *expr = (ExprState *) lfirst(lc);
1080 
1081  fcinfo->args[i].value = ExecEvalExpr(expr,
1082  aggstate->ss.ps.ps_ExprContext,
1083  &fcinfo->args[i].isnull);
1084  anynull |= fcinfo->args[i].isnull;
1085  i++;
1086  }
1087 
1088  /*
1089  * Apply the agg's finalfn if one is provided, else return transValue.
1090  */
1091  if (OidIsValid(peragg->finalfn_oid))
1092  {
1093  int numFinalArgs = peragg->numFinalArgs;
1094 
1095  /* set up aggstate->curperagg for AggGetAggref() */
1096  aggstate->curperagg = peragg;
1097 
1098  InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1099  numFinalArgs,
1100  pertrans->aggCollation,
1101  (void *) aggstate, NULL);
1102 
1103  /* Fill in the transition state value */
1104  fcinfo->args[0].value =
1105  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1106  pergroupstate->transValueIsNull,
1107  pertrans->transtypeLen);
1108  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1109  anynull |= pergroupstate->transValueIsNull;
1110 
1111  /* Fill any remaining argument positions with nulls */
1112  for (; i < numFinalArgs; i++)
1113  {
1114  fcinfo->args[i].value = (Datum) 0;
1115  fcinfo->args[i].isnull = true;
1116  anynull = true;
1117  }
1118 
1119  if (fcinfo->flinfo->fn_strict && anynull)
1120  {
1121  /* don't call a strict function with NULL inputs */
1122  *resultVal = (Datum) 0;
1123  *resultIsNull = true;
1124  }
1125  else
1126  {
1127  *resultVal = FunctionCallInvoke(fcinfo);
1128  *resultIsNull = fcinfo->isnull;
1129  }
1130  aggstate->curperagg = NULL;
1131  }
1132  else
1133  {
1134  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1135  *resultVal = pergroupstate->transValue;
1136  *resultIsNull = pergroupstate->transValueIsNull;
1137  }
1138 
1139  /*
1140  * If result is pass-by-ref, make sure it is in the right context.
1141  */
1142  if (!peragg->resulttypeByVal && !*resultIsNull &&
1144  DatumGetPointer(*resultVal)))
1145  *resultVal = datumCopy(*resultVal,
1146  peragg->resulttypeByVal,
1147  peragg->resulttypeLen);
1148 
1149  MemoryContextSwitchTo(oldContext);
1150 }
1151 
1152 /*
1153  * Compute the output value of one partial aggregate.
1154  *
1155  * The serialization function will be run, and the result delivered, in the
1156  * output-tuple context; caller's CurrentMemoryContext does not matter.
1157  */
1158 static void
1160  AggStatePerAgg peragg,
1161  AggStatePerGroup pergroupstate,
1162  Datum *resultVal, bool *resultIsNull)
1163 {
1164  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1165  MemoryContext oldContext;
1166 
1168 
1169  /*
1170  * serialfn_oid will be set if we must serialize the transvalue before
1171  * returning it
1172  */
1173  if (OidIsValid(pertrans->serialfn_oid))
1174  {
1175  /* Don't call a strict serialization function with NULL input. */
1176  if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1177  {
1178  *resultVal = (Datum) 0;
1179  *resultIsNull = true;
1180  }
1181  else
1182  {
1183  FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1184 
1185  fcinfo->args[0].value =
1186  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1187  pergroupstate->transValueIsNull,
1188  pertrans->transtypeLen);
1189  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1190  fcinfo->isnull = false;
1191 
1192  *resultVal = FunctionCallInvoke(fcinfo);
1193  *resultIsNull = fcinfo->isnull;
1194  }
1195  }
1196  else
1197  {
1198  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1199  *resultVal = pergroupstate->transValue;
1200  *resultIsNull = pergroupstate->transValueIsNull;
1201  }
1202 
1203  /* If result is pass-by-ref, make sure it is in the right context. */
1204  if (!peragg->resulttypeByVal && !*resultIsNull &&
1206  DatumGetPointer(*resultVal)))
1207  *resultVal = datumCopy(*resultVal,
1208  peragg->resulttypeByVal,
1209  peragg->resulttypeLen);
1210 
1211  MemoryContextSwitchTo(oldContext);
1212 }
1213 
1214 /*
1215  * Extract the attributes that make up the grouping key into the
1216  * hashslot. This is necessary to compute the hash or perform a lookup.
1217  */
1218 static inline void
1220  TupleTableSlot *inputslot,
1221  TupleTableSlot *hashslot)
1222 {
1223  int i;
1224 
1225  /* transfer just the needed columns into hashslot */
1226  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1227  ExecClearTuple(hashslot);
1228 
1229  for (i = 0; i < perhash->numhashGrpCols; i++)
1230  {
1231  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1232 
1233  hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1234  hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1235  }
1236  ExecStoreVirtualTuple(hashslot);
1237 }
1238 
1239 /*
1240  * Prepare to finalize and project based on the specified representative tuple
1241  * slot and grouping set.
1242  *
1243  * In the specified tuple slot, force to null all attributes that should be
1244  * read as null in the context of the current grouping set. Also stash the
1245  * current group bitmap where GroupingExpr can get at it.
1246  *
1247  * This relies on three conditions:
1248  *
1249  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1250  * only reference it in evaluations, which will only access individual
1251  * attributes.
1252  *
1253  * 2) No system columns are going to need to be nulled. (If a system column is
1254  * referenced in a group clause, it is actually projected in the outer plan
1255  * tlist.)
1256  *
1257  * 3) Within a given phase, we never need to recover the value of an attribute
1258  * once it has been set to null.
1259  *
1260  * Poking into the slot this way is a bit ugly, but the consensus is that the
1261  * alternative was worse.
1262  */
1263 static void
1264 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1265 {
1266  if (aggstate->phase->grouped_cols)
1267  {
1268  Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1269 
1270  aggstate->grouped_cols = grouped_cols;
1271 
1272  if (TTS_EMPTY(slot))
1273  {
1274  /*
1275  * Force all values to be NULL if working on an empty input tuple
1276  * (i.e. an empty grouping set for which no input rows were
1277  * supplied).
1278  */
1279  ExecStoreAllNullTuple(slot);
1280  }
1281  else if (aggstate->all_grouped_cols)
1282  {
1283  ListCell *lc;
1284 
1285  /* all_grouped_cols is arranged in desc order */
1287 
1288  foreach(lc, aggstate->all_grouped_cols)
1289  {
1290  int attnum = lfirst_int(lc);
1291 
1292  if (!bms_is_member(attnum, grouped_cols))
1293  slot->tts_isnull[attnum - 1] = true;
1294  }
1295  }
1296  }
1297 }
1298 
1299 /*
1300  * Compute the final value of all aggregates for one group.
1301  *
1302  * This function handles only one grouping set at a time, which the caller must
1303  * have selected. It's also the caller's responsibility to adjust the supplied
1304  * pergroup parameter to point to the current set's transvalues.
1305  *
1306  * Results are stored in the output econtext aggvalues/aggnulls.
1307  */
1308 static void
1310  AggStatePerAgg peraggs,
1311  AggStatePerGroup pergroup)
1312 {
1313  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1314  Datum *aggvalues = econtext->ecxt_aggvalues;
1315  bool *aggnulls = econtext->ecxt_aggnulls;
1316  int aggno;
1317  int transno;
1318 
1319  /*
1320  * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1321  * inputs and run the transition functions.
1322  */
1323  for (transno = 0; transno < aggstate->numtrans; transno++)
1324  {
1325  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1326  AggStatePerGroup pergroupstate;
1327 
1328  pergroupstate = &pergroup[transno];
1329 
1330  if (pertrans->numSortCols > 0)
1331  {
1332  Assert(aggstate->aggstrategy != AGG_HASHED &&
1333  aggstate->aggstrategy != AGG_MIXED);
1334 
1335  if (pertrans->numInputs == 1)
1337  pertrans,
1338  pergroupstate);
1339  else
1341  pertrans,
1342  pergroupstate);
1343  }
1344  }
1345 
1346  /*
1347  * Run the final functions.
1348  */
1349  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1350  {
1351  AggStatePerAgg peragg = &peraggs[aggno];
1352  int transno = peragg->transno;
1353  AggStatePerGroup pergroupstate;
1354 
1355  pergroupstate = &pergroup[transno];
1356 
1357  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1358  finalize_partialaggregate(aggstate, peragg, pergroupstate,
1359  &aggvalues[aggno], &aggnulls[aggno]);
1360  else
1361  finalize_aggregate(aggstate, peragg, pergroupstate,
1362  &aggvalues[aggno], &aggnulls[aggno]);
1363  }
1364 }
1365 
1366 /*
1367  * Project the result of a group (whose aggs have already been calculated by
1368  * finalize_aggregates). Returns the result slot, or NULL if no row is
1369  * projected (suppressed by qual).
1370  */
1371 static TupleTableSlot *
1373 {
1374  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1375 
1376  /*
1377  * Check the qual (HAVING clause); if the group does not match, ignore it.
1378  */
1379  if (ExecQual(aggstate->ss.ps.qual, econtext))
1380  {
1381  /*
1382  * Form and return projection tuple using the aggregate results and
1383  * the representative input tuple.
1384  */
1385  return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1386  }
1387  else
1388  InstrCountFiltered1(aggstate, 1);
1389 
1390  return NULL;
1391 }
1392 
1393 /*
1394  * Walk tlist and qual to find referenced colnos, dividing them into
1395  * aggregated and unaggregated sets.
1396  */
1397 static void
1398 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1399 {
1400  Agg *agg = (Agg *) aggstate->ss.ps.plan;
1401  FindColsContext context;
1402 
1403  context.is_aggref = false;
1404  context.aggregated = NULL;
1405  context.unaggregated = NULL;
1406 
1407  (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1408  (void) find_cols_walker((Node *) agg->plan.qual, &context);
1409 
1410  *aggregated = context.aggregated;
1411  *unaggregated = context.unaggregated;
1412 }
1413 
1414 static bool
1416 {
1417  if (node == NULL)
1418  return false;
1419  if (IsA(node, Var))
1420  {
1421  Var *var = (Var *) node;
1422 
1423  /* setrefs.c should have set the varno to OUTER_VAR */
1424  Assert(var->varno == OUTER_VAR);
1425  Assert(var->varlevelsup == 0);
1426  if (context->is_aggref)
1427  context->aggregated = bms_add_member(context->aggregated,
1428  var->varattno);
1429  else
1430  context->unaggregated = bms_add_member(context->unaggregated,
1431  var->varattno);
1432  return false;
1433  }
1434  if (IsA(node, Aggref))
1435  {
1436  Assert(!context->is_aggref);
1437  context->is_aggref = true;
1438  expression_tree_walker(node, find_cols_walker, (void *) context);
1439  context->is_aggref = false;
1440  return false;
1441  }
1443  (void *) context);
1444 }
1445 
1446 /*
1447  * (Re-)initialize the hash table(s) to empty.
1448  *
1449  * To implement hashed aggregation, we need a hashtable that stores a
1450  * representative tuple and an array of AggStatePerGroup structs for each
1451  * distinct set of GROUP BY column values. We compute the hash key from the
1452  * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1453  * for each entry.
1454  *
1455  * We have a separate hashtable and associated perhash data structure for each
1456  * grouping set for which we're doing hashing.
1457  *
1458  * The contents of the hash tables always live in the hashcontext's per-tuple
1459  * memory context (there is only one of these for all tables together, since
1460  * they are all reset at the same time).
1461  */
1462 static void
1464 {
1465  int setno;
1466 
1467  for (setno = 0; setno < aggstate->num_hashes; ++setno)
1468  {
1469  AggStatePerHash perhash = &aggstate->perhash[setno];
1470  long nbuckets;
1471  Size memory;
1472 
1473  if (perhash->hashtable != NULL)
1474  {
1475  ResetTupleHashTable(perhash->hashtable);
1476  continue;
1477  }
1478 
1479  Assert(perhash->aggnode->numGroups > 0);
1480 
1481  memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1482 
1483  /* choose reasonable number of buckets per hashtable */
1484  nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1485  perhash->aggnode->numGroups,
1486  memory);
1487 
1488  build_hash_table(aggstate, setno, nbuckets);
1489  }
1490 
1491  aggstate->hash_ngroups_current = 0;
1492 }
1493 
1494 /*
1495  * Build a single hashtable for this grouping set.
1496  */
1497 static void
1498 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1499 {
1500  AggStatePerHash perhash = &aggstate->perhash[setno];
1501  MemoryContext metacxt = aggstate->hash_metacxt;
1502  MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1503  MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1504  Size additionalsize;
1505 
1506  Assert(aggstate->aggstrategy == AGG_HASHED ||
1507  aggstate->aggstrategy == AGG_MIXED);
1508 
1509  /*
1510  * Used to make sure initial hash table allocation does not exceed
1511  * hash_mem. Note that the estimate does not include space for
1512  * pass-by-reference transition data values, nor for the representative
1513  * tuple of each group.
1514  */
1515  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1516 
1517  perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1518  perhash->hashslot->tts_tupleDescriptor,
1519  perhash->numCols,
1520  perhash->hashGrpColIdxHash,
1521  perhash->eqfuncoids,
1522  perhash->hashfunctions,
1523  perhash->aggnode->grpCollations,
1524  nbuckets,
1525  additionalsize,
1526  metacxt,
1527  hashcxt,
1528  tmpcxt,
1529  DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1530 }
1531 
1532 /*
1533  * Compute columns that actually need to be stored in hashtable entries. The
1534  * incoming tuples from the child plan node will contain grouping columns,
1535  * other columns referenced in our targetlist and qual, columns used to
1536  * compute the aggregate functions, and perhaps just junk columns we don't use
1537  * at all. Only columns of the first two types need to be stored in the
1538  * hashtable, and getting rid of the others can make the table entries
1539  * significantly smaller. The hashtable only contains the relevant columns,
1540  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1541  * into the format of the normal input descriptor.
1542  *
1543  * Additional columns, in addition to the columns grouped by, come from two
1544  * sources: Firstly functionally dependent columns that we don't need to group
1545  * by themselves, and secondly ctids for row-marks.
1546  *
1547  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1548  * then build an array of the columns included in the hashtable. We might
1549  * still have duplicates if the passed-in grpColIdx has them, which can happen
1550  * in edge cases from semijoins/distinct; these can't always be removed,
1551  * because it's not certain that the duplicate cols will be using the same
1552  * hash function.
1553  *
1554  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1555  * the per-query context (unlike the hash table itself).
1556  */
1557 static void
1559 {
1560  Bitmapset *base_colnos;
1561  Bitmapset *aggregated_colnos;
1562  TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1563  List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1564  int numHashes = aggstate->num_hashes;
1565  EState *estate = aggstate->ss.ps.state;
1566  int j;
1567 
1568  /* Find Vars that will be needed in tlist and qual */
1569  find_cols(aggstate, &aggregated_colnos, &base_colnos);
1570  aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1571  aggstate->max_colno_needed = 0;
1572  aggstate->all_cols_needed = true;
1573 
1574  for (int i = 0; i < scanDesc->natts; i++)
1575  {
1576  int colno = i + 1;
1577  if (bms_is_member(colno, aggstate->colnos_needed))
1578  aggstate->max_colno_needed = colno;
1579  else
1580  aggstate->all_cols_needed = false;
1581  }
1582 
1583  for (j = 0; j < numHashes; ++j)
1584  {
1585  AggStatePerHash perhash = &aggstate->perhash[j];
1586  Bitmapset *colnos = bms_copy(base_colnos);
1587  AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1588  List *hashTlist = NIL;
1589  TupleDesc hashDesc;
1590  int maxCols;
1591  int i;
1592 
1593  perhash->largestGrpColIdx = 0;
1594 
1595  /*
1596  * If we're doing grouping sets, then some Vars might be referenced in
1597  * tlist/qual for the benefit of other grouping sets, but not needed
1598  * when hashing; i.e. prepare_projection_slot will null them out, so
1599  * there'd be no point storing them. Use prepare_projection_slot's
1600  * logic to determine which.
1601  */
1602  if (aggstate->phases[0].grouped_cols)
1603  {
1604  Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1605  ListCell *lc;
1606 
1607  foreach(lc, aggstate->all_grouped_cols)
1608  {
1609  int attnum = lfirst_int(lc);
1610 
1611  if (!bms_is_member(attnum, grouped_cols))
1612  colnos = bms_del_member(colnos, attnum);
1613  }
1614  }
1615 
1616  /*
1617  * Compute maximum number of input columns accounting for possible
1618  * duplications in the grpColIdx array, which can happen in some edge
1619  * cases where HashAggregate was generated as part of a semijoin or a
1620  * DISTINCT.
1621  */
1622  maxCols = bms_num_members(colnos) + perhash->numCols;
1623 
1624  perhash->hashGrpColIdxInput =
1625  palloc(maxCols * sizeof(AttrNumber));
1626  perhash->hashGrpColIdxHash =
1627  palloc(perhash->numCols * sizeof(AttrNumber));
1628 
1629  /* Add all the grouping columns to colnos */
1630  for (i = 0; i < perhash->numCols; i++)
1631  colnos = bms_add_member(colnos, grpColIdx[i]);
1632 
1633  /*
1634  * First build mapping for columns directly hashed. These are the
1635  * first, because they'll be accessed when computing hash values and
1636  * comparing tuples for exact matches. We also build simple mapping
1637  * for execGrouping, so it knows where to find the to-be-hashed /
1638  * compared columns in the input.
1639  */
1640  for (i = 0; i < perhash->numCols; i++)
1641  {
1642  perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1643  perhash->hashGrpColIdxHash[i] = i + 1;
1644  perhash->numhashGrpCols++;
1645  /* delete already mapped columns */
1646  bms_del_member(colnos, grpColIdx[i]);
1647  }
1648 
1649  /* and add the remaining columns */
1650  while ((i = bms_first_member(colnos)) >= 0)
1651  {
1652  perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1653  perhash->numhashGrpCols++;
1654  }
1655 
1656  /* and build a tuple descriptor for the hashtable */
1657  for (i = 0; i < perhash->numhashGrpCols; i++)
1658  {
1659  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1660 
1661  hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1662  perhash->largestGrpColIdx =
1663  Max(varNumber + 1, perhash->largestGrpColIdx);
1664  }
1665 
1666  hashDesc = ExecTypeFromTL(hashTlist);
1667 
1668  execTuplesHashPrepare(perhash->numCols,
1669  perhash->aggnode->grpOperators,
1670  &perhash->eqfuncoids,
1671  &perhash->hashfunctions);
1672  perhash->hashslot =
1673  ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1675 
1676  list_free(hashTlist);
1677  bms_free(colnos);
1678  }
1679 
1680  bms_free(base_colnos);
1681 }
1682 
1683 /*
1684  * Estimate per-hash-table-entry overhead.
1685  */
1686 Size
1687 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1688 {
1689  Size tupleChunkSize;
1690  Size pergroupChunkSize;
1691  Size transitionChunkSize;
1692  Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1693  tupleWidth);
1694  Size pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1695 
1696  tupleChunkSize = CHUNKHDRSZ + tupleSize;
1697 
1698  if (pergroupSize > 0)
1699  pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1700  else
1701  pergroupChunkSize = 0;
1702 
1703  if (transitionSpace > 0)
1704  transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1705  else
1706  transitionChunkSize = 0;
1707 
1708  return
1709  sizeof(TupleHashEntryData) +
1710  tupleChunkSize +
1711  pergroupChunkSize +
1712  transitionChunkSize;
1713 }
1714 
1715 /*
1716  * hashagg_recompile_expressions()
1717  *
1718  * Identifies the right phase, compiles the right expression given the
1719  * arguments, and then sets phase->evalfunc to that expression.
1720  *
1721  * Different versions of the compiled expression are needed depending on
1722  * whether hash aggregation has spilled or not, and whether it's reading from
1723  * the outer plan or a tape. Before spilling to disk, the expression reads
1724  * from the outer plan and does not need to perform a NULL check. After
1725  * HashAgg begins to spill, new groups will not be created in the hash table,
1726  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1727  * pointer check to the expression. Then, when reading spilled data from a
1728  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1729  *
1730  * It would be wasteful to recompile every time, so cache the compiled
1731  * expressions in the AggStatePerPhase, and reuse when appropriate.
1732  */
1733 static void
1734 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1735 {
1736  AggStatePerPhase phase;
1737  int i = minslot ? 1 : 0;
1738  int j = nullcheck ? 1 : 0;
1739 
1740  Assert(aggstate->aggstrategy == AGG_HASHED ||
1741  aggstate->aggstrategy == AGG_MIXED);
1742 
1743  if (aggstate->aggstrategy == AGG_HASHED)
1744  phase = &aggstate->phases[0];
1745  else /* AGG_MIXED */
1746  phase = &aggstate->phases[1];
1747 
1748  if (phase->evaltrans_cache[i][j] == NULL)
1749  {
1750  const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1751  bool outerfixed = aggstate->ss.ps.outeropsfixed;
1752  bool dohash = true;
1753  bool dosort;
1754 
1755  dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
1756 
1757  /* temporarily change the outerops while compiling the expression */
1758  if (minslot)
1759  {
1760  aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1761  aggstate->ss.ps.outeropsfixed = true;
1762  }
1763 
1764  phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1765  dosort, dohash,
1766  nullcheck);
1767 
1768  /* change back */
1769  aggstate->ss.ps.outerops = outerops;
1770  aggstate->ss.ps.outeropsfixed = outerfixed;
1771  }
1772 
1773  phase->evaltrans = phase->evaltrans_cache[i][j];
1774 }
1775 
1776 /*
1777  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1778  * number of partitions we expect to create (if we do spill).
1779  *
1780  * There are two limits: a memory limit, and also an ngroups limit. The
1781  * ngroups limit becomes important when we expect transition values to grow
1782  * substantially larger than the initial value.
1783  */
1784 void
1785 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1786  Size *mem_limit, uint64 *ngroups_limit,
1787  int *num_partitions)
1788 {
1789  int npartitions;
1790  Size partition_mem;
1791  int hash_mem = get_hash_mem();
1792 
1793  /* if not expected to spill, use all of hash_mem */
1794  if (input_groups * hashentrysize < hash_mem * 1024L)
1795  {
1796  if (num_partitions != NULL)
1797  *num_partitions = 0;
1798  *mem_limit = hash_mem * 1024L;
1799  *ngroups_limit = *mem_limit / hashentrysize;
1800  return;
1801  }
1802 
1803  /*
1804  * Calculate expected memory requirements for spilling, which is the size
1805  * of the buffers needed for all the tapes that need to be open at once.
1806  * Then, subtract that from the memory available for holding hash tables.
1807  */
1808  npartitions = hash_choose_num_partitions(input_groups,
1809  hashentrysize,
1810  used_bits,
1811  NULL);
1812  if (num_partitions != NULL)
1813  *num_partitions = npartitions;
1814 
1815  partition_mem =
1817  HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1818 
1819  /*
1820  * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1821  * minimum number of partitions, so we aren't going to dramatically exceed
1822  * work mem anyway.
1823  */
1824  if (hash_mem * 1024L > 4 * partition_mem)
1825  *mem_limit = hash_mem * 1024L - partition_mem;
1826  else
1827  *mem_limit = hash_mem * 1024L * 0.75;
1828 
1829  if (*mem_limit > hashentrysize)
1830  *ngroups_limit = *mem_limit / hashentrysize;
1831  else
1832  *ngroups_limit = 1;
1833 }
1834 
1835 /*
1836  * hash_agg_check_limits
1837  *
1838  * After adding a new group to the hash table, check whether we need to enter
1839  * spill mode. Allocations may happen without adding new groups (for instance,
1840  * if the transition state size grows), so this check is imperfect.
1841  */
1842 static void
1844 {
1845  uint64 ngroups = aggstate->hash_ngroups_current;
1846  Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1847  true);
1849  true);
1850 
1851  /*
1852  * Don't spill unless there's at least one group in the hash table so we
1853  * can be sure to make progress even in edge cases.
1854  */
1855  if (aggstate->hash_ngroups_current > 0 &&
1856  (meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
1857  ngroups > aggstate->hash_ngroups_limit))
1858  {
1859  hash_agg_enter_spill_mode(aggstate);
1860  }
1861 }
1862 
1863 /*
1864  * Enter "spill mode", meaning that no new groups are added to any of the hash
1865  * tables. Tuples that would create a new group are instead spilled, and
1866  * processed later.
1867  */
1868 static void
1870 {
1871  aggstate->hash_spill_mode = true;
1872  hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1873 
1874  if (!aggstate->hash_ever_spilled)
1875  {
1876  Assert(aggstate->hash_tapeinfo == NULL);
1877  Assert(aggstate->hash_spills == NULL);
1878 
1879  aggstate->hash_ever_spilled = true;
1880 
1881  hashagg_tapeinfo_init(aggstate);
1882 
1883  aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1884 
1885  for (int setno = 0; setno < aggstate->num_hashes; setno++)
1886  {
1887  AggStatePerHash perhash = &aggstate->perhash[setno];
1888  HashAggSpill *spill = &aggstate->hash_spills[setno];
1889 
1890  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1891  perhash->aggnode->numGroups,
1892  aggstate->hashentrysize);
1893  }
1894  }
1895 }
1896 
1897 /*
1898  * Update metrics after filling the hash table.
1899  *
1900  * If reading from the outer plan, from_tape should be false; if reading from
1901  * another tape, from_tape should be true.
1902  */
1903 static void
1904 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1905 {
1906  Size meta_mem;
1907  Size hashkey_mem;
1908  Size buffer_mem;
1909  Size total_mem;
1910 
1911  if (aggstate->aggstrategy != AGG_MIXED &&
1912  aggstate->aggstrategy != AGG_HASHED)
1913  return;
1914 
1915  /* memory for the hash table itself */
1916  meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1917 
1918  /* memory for the group keys and transition states */
1919  hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1920 
1921  /* memory for read/write tape buffers, if spilled */
1922  buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1923  if (from_tape)
1924  buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1925 
1926  /* update peak mem */
1927  total_mem = meta_mem + hashkey_mem + buffer_mem;
1928  if (total_mem > aggstate->hash_mem_peak)
1929  aggstate->hash_mem_peak = total_mem;
1930 
1931  /* update disk usage */
1932  if (aggstate->hash_tapeinfo != NULL)
1933  {
1934  uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1935 
1936  if (aggstate->hash_disk_used < disk_used)
1937  aggstate->hash_disk_used = disk_used;
1938  }
1939 
1940  /* update hashentrysize estimate based on contents */
1941  if (aggstate->hash_ngroups_current > 0)
1942  {
1943  aggstate->hashentrysize =
1944  sizeof(TupleHashEntryData) +
1945  (hashkey_mem / (double) aggstate->hash_ngroups_current);
1946  }
1947 }
1948 
1949 /*
1950  * Choose a reasonable number of buckets for the initial hash table size.
1951  */
1952 static long
1953 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1954 {
1955  long max_nbuckets;
1956  long nbuckets = ngroups;
1957 
1958  max_nbuckets = memory / hashentrysize;
1959 
1960  /*
1961  * Underestimating is better than overestimating. Too many buckets crowd
1962  * out space for group keys and transition state values.
1963  */
1964  max_nbuckets >>= 1;
1965 
1966  if (nbuckets > max_nbuckets)
1967  nbuckets = max_nbuckets;
1968 
1969  return Max(nbuckets, 1);
1970 }
1971 
1972 /*
1973  * Determine the number of partitions to create when spilling, which will
1974  * always be a power of two. If log2_npartitions is non-NULL, set
1975  * *log2_npartitions to the log2() of the number of partitions.
1976  */
1977 static int
1978 hash_choose_num_partitions(double input_groups, double hashentrysize,
1979  int used_bits, int *log2_npartitions)
1980 {
1981  Size mem_wanted;
1982  int partition_limit;
1983  int npartitions;
1984  int partition_bits;
1985  int hash_mem = get_hash_mem();
1986 
1987  /*
1988  * Avoid creating so many partitions that the memory requirements of the
1989  * open partition files are greater than 1/4 of hash_mem.
1990  */
1991  partition_limit =
1992  (hash_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
1994 
1995  mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
1996 
1997  /* make enough partitions so that each one is likely to fit in memory */
1998  npartitions = 1 + (mem_wanted / (hash_mem * 1024L));
1999 
2000  if (npartitions > partition_limit)
2001  npartitions = partition_limit;
2002 
2003  if (npartitions < HASHAGG_MIN_PARTITIONS)
2004  npartitions = HASHAGG_MIN_PARTITIONS;
2005  if (npartitions > HASHAGG_MAX_PARTITIONS)
2006  npartitions = HASHAGG_MAX_PARTITIONS;
2007 
2008  /* ceil(log2(npartitions)) */
2009  partition_bits = my_log2(npartitions);
2010 
2011  /* make sure that we don't exhaust the hash bits */
2012  if (partition_bits + used_bits >= 32)
2013  partition_bits = 32 - used_bits;
2014 
2015  if (log2_npartitions != NULL)
2016  *log2_npartitions = partition_bits;
2017 
2018  /* number of partitions will be a power of two */
2019  npartitions = 1L << partition_bits;
2020 
2021  return npartitions;
2022 }
2023 
2024 /*
2025  * Initialize a freshly-created TupleHashEntry.
2026  */
2027 static void
2029  TupleHashEntry entry)
2030 {
2031  AggStatePerGroup pergroup;
2032  int transno;
2033 
2034  aggstate->hash_ngroups_current++;
2035  hash_agg_check_limits(aggstate);
2036 
2037  /* no need to allocate or initialize per-group state */
2038  if (aggstate->numtrans == 0)
2039  return;
2040 
2041  pergroup = (AggStatePerGroup)
2042  MemoryContextAlloc(hashtable->tablecxt,
2043  sizeof(AggStatePerGroupData) * aggstate->numtrans);
2044 
2045  entry->additional = pergroup;
2046 
2047  /*
2048  * Initialize aggregates for new tuple group, lookup_hash_entries()
2049  * already has selected the relevant grouping set.
2050  */
2051  for (transno = 0; transno < aggstate->numtrans; transno++)
2052  {
2053  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2054  AggStatePerGroup pergroupstate = &pergroup[transno];
2055 
2056  initialize_aggregate(aggstate, pertrans, pergroupstate);
2057  }
2058 }
2059 
2060 /*
2061  * Look up hash entries for the current tuple in all hashed grouping sets,
2062  * returning an array of pergroup pointers suitable for advance_aggregates.
2063  *
2064  * Be aware that lookup_hash_entry can reset the tmpcontext.
2065  *
2066  * Some entries may be left NULL if we are in "spill mode". The same tuple
2067  * will belong to different groups for each grouping set, so may match a group
2068  * already in memory for one set and match a group not in memory for another
2069  * set. When in "spill mode", the tuple will be spilled for each grouping set
2070  * where it doesn't match a group in memory.
2071  *
2072  * NB: It's possible to spill the same tuple for several different grouping
2073  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2074  * the tuple multiple times for multiple grouping sets, it can be partitioned
2075  * for each grouping set, making the refilling of the hash table very
2076  * efficient.
2077  */
2078 static void
2080 {
2081  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2082  TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2083  int setno;
2084 
2085  for (setno = 0; setno < aggstate->num_hashes; setno++)
2086  {
2087  AggStatePerHash perhash = &aggstate->perhash[setno];
2088  TupleHashTable hashtable = perhash->hashtable;
2089  TupleTableSlot *hashslot = perhash->hashslot;
2090  TupleHashEntry entry;
2091  uint32 hash;
2092  bool isnew = false;
2093  bool *p_isnew;
2094 
2095  /* if hash table already spilled, don't create new entries */
2096  p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2097 
2098  select_current_set(aggstate, setno, true);
2099  prepare_hash_slot(perhash,
2100  outerslot,
2101  hashslot);
2102 
2103  entry = LookupTupleHashEntry(hashtable, hashslot,
2104  p_isnew, &hash);
2105 
2106  if (entry != NULL)
2107  {
2108  if (isnew)
2109  initialize_hash_entry(aggstate, hashtable, entry);
2110  pergroup[setno] = entry->additional;
2111  }
2112  else
2113  {
2114  HashAggSpill *spill = &aggstate->hash_spills[setno];
2115  TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2116 
2117  if (spill->partitions == NULL)
2118  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2119  perhash->aggnode->numGroups,
2120  aggstate->hashentrysize);
2121 
2122  hashagg_spill_tuple(aggstate, spill, slot, hash);
2123  pergroup[setno] = NULL;
2124  }
2125  }
2126 }
2127 
2128 /*
2129  * ExecAgg -
2130  *
2131  * ExecAgg receives tuples from its outer subplan and aggregates over
2132  * the appropriate attribute for each aggregate function use (Aggref
2133  * node) appearing in the targetlist or qual of the node. The number
2134  * of tuples to aggregate over depends on whether grouped or plain
2135  * aggregation is selected. In grouped aggregation, we produce a result
2136  * row for each group; in plain aggregation there's a single result row
2137  * for the whole query. In either case, the value of each aggregate is
2138  * stored in the expression context to be used when ExecProject evaluates
2139  * the result tuple.
2140  */
2141 static TupleTableSlot *
2143 {
2144  AggState *node = castNode(AggState, pstate);
2145  TupleTableSlot *result = NULL;
2146 
2148 
2149  if (!node->agg_done)
2150  {
2151  /* Dispatch based on strategy */
2152  switch (node->phase->aggstrategy)
2153  {
2154  case AGG_HASHED:
2155  if (!node->table_filled)
2156  agg_fill_hash_table(node);
2157  /* FALLTHROUGH */
2158  case AGG_MIXED:
2159  result = agg_retrieve_hash_table(node);
2160  break;
2161  case AGG_PLAIN:
2162  case AGG_SORTED:
2163  result = agg_retrieve_direct(node);
2164  break;
2165  }
2166 
2167  if (!TupIsNull(result))
2168  return result;
2169  }
2170 
2171  return NULL;
2172 }
2173 
2174 /*
2175  * ExecAgg for non-hashed case
2176  */
2177 static TupleTableSlot *
2179 {
2180  Agg *node = aggstate->phase->aggnode;
2181  ExprContext *econtext;
2182  ExprContext *tmpcontext;
2183  AggStatePerAgg peragg;
2184  AggStatePerGroup *pergroups;
2185  TupleTableSlot *outerslot;
2186  TupleTableSlot *firstSlot;
2187  TupleTableSlot *result;
2188  bool hasGroupingSets = aggstate->phase->numsets > 0;
2189  int numGroupingSets = Max(aggstate->phase->numsets, 1);
2190  int currentSet;
2191  int nextSetSize;
2192  int numReset;
2193  int i;
2194 
2195  /*
2196  * get state info from node
2197  *
2198  * econtext is the per-output-tuple expression context
2199  *
2200  * tmpcontext is the per-input-tuple expression context
2201  */
2202  econtext = aggstate->ss.ps.ps_ExprContext;
2203  tmpcontext = aggstate->tmpcontext;
2204 
2205  peragg = aggstate->peragg;
2206  pergroups = aggstate->pergroups;
2207  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2208 
2209  /*
2210  * We loop retrieving groups until we find one matching
2211  * aggstate->ss.ps.qual
2212  *
2213  * For grouping sets, we have the invariant that aggstate->projected_set
2214  * is either -1 (initial call) or the index (starting from 0) in
2215  * gset_lengths for the group we just completed (either by projecting a
2216  * row or by discarding it in the qual).
2217  */
2218  while (!aggstate->agg_done)
2219  {
2220  /*
2221  * Clear the per-output-tuple context for each group, as well as
2222  * aggcontext (which contains any pass-by-ref transvalues of the old
2223  * group). Some aggregate functions store working state in child
2224  * contexts; those now get reset automatically without us needing to
2225  * do anything special.
2226  *
2227  * We use ReScanExprContext not just ResetExprContext because we want
2228  * any registered shutdown callbacks to be called. That allows
2229  * aggregate functions to ensure they've cleaned up any non-memory
2230  * resources.
2231  */
2232  ReScanExprContext(econtext);
2233 
2234  /*
2235  * Determine how many grouping sets need to be reset at this boundary.
2236  */
2237  if (aggstate->projected_set >= 0 &&
2238  aggstate->projected_set < numGroupingSets)
2239  numReset = aggstate->projected_set + 1;
2240  else
2241  numReset = numGroupingSets;
2242 
2243  /*
2244  * numReset can change on a phase boundary, but that's OK; we want to
2245  * reset the contexts used in _this_ phase, and later, after possibly
2246  * changing phase, initialize the right number of aggregates for the
2247  * _new_ phase.
2248  */
2249 
2250  for (i = 0; i < numReset; i++)
2251  {
2252  ReScanExprContext(aggstate->aggcontexts[i]);
2253  }
2254 
2255  /*
2256  * Check if input is complete and there are no more groups to project
2257  * in this phase; move to next phase or mark as done.
2258  */
2259  if (aggstate->input_done == true &&
2260  aggstate->projected_set >= (numGroupingSets - 1))
2261  {
2262  if (aggstate->current_phase < aggstate->numphases - 1)
2263  {
2264  initialize_phase(aggstate, aggstate->current_phase + 1);
2265  aggstate->input_done = false;
2266  aggstate->projected_set = -1;
2267  numGroupingSets = Max(aggstate->phase->numsets, 1);
2268  node = aggstate->phase->aggnode;
2269  numReset = numGroupingSets;
2270  }
2271  else if (aggstate->aggstrategy == AGG_MIXED)
2272  {
2273  /*
2274  * Mixed mode; we've output all the grouped stuff and have
2275  * full hashtables, so switch to outputting those.
2276  */
2277  initialize_phase(aggstate, 0);
2278  aggstate->table_filled = true;
2280  &aggstate->perhash[0].hashiter);
2281  select_current_set(aggstate, 0, true);
2282  return agg_retrieve_hash_table(aggstate);
2283  }
2284  else
2285  {
2286  aggstate->agg_done = true;
2287  break;
2288  }
2289  }
2290 
2291  /*
2292  * Get the number of columns in the next grouping set after the last
2293  * projected one (if any). This is the number of columns to compare to
2294  * see if we reached the boundary of that set too.
2295  */
2296  if (aggstate->projected_set >= 0 &&
2297  aggstate->projected_set < (numGroupingSets - 1))
2298  nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2299  else
2300  nextSetSize = 0;
2301 
2302  /*----------
2303  * If a subgroup for the current grouping set is present, project it.
2304  *
2305  * We have a new group if:
2306  * - we're out of input but haven't projected all grouping sets
2307  * (checked above)
2308  * OR
2309  * - we already projected a row that wasn't from the last grouping
2310  * set
2311  * AND
2312  * - the next grouping set has at least one grouping column (since
2313  * empty grouping sets project only once input is exhausted)
2314  * AND
2315  * - the previous and pending rows differ on the grouping columns
2316  * of the next grouping set
2317  *----------
2318  */
2319  tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2320  if (aggstate->input_done ||
2321  (node->aggstrategy != AGG_PLAIN &&
2322  aggstate->projected_set != -1 &&
2323  aggstate->projected_set < (numGroupingSets - 1) &&
2324  nextSetSize > 0 &&
2325  !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2326  tmpcontext)))
2327  {
2328  aggstate->projected_set += 1;
2329 
2330  Assert(aggstate->projected_set < numGroupingSets);
2331  Assert(nextSetSize > 0 || aggstate->input_done);
2332  }
2333  else
2334  {
2335  /*
2336  * We no longer care what group we just projected, the next
2337  * projection will always be the first (or only) grouping set
2338  * (unless the input proves to be empty).
2339  */
2340  aggstate->projected_set = 0;
2341 
2342  /*
2343  * If we don't already have the first tuple of the new group,
2344  * fetch it from the outer plan.
2345  */
2346  if (aggstate->grp_firstTuple == NULL)
2347  {
2348  outerslot = fetch_input_tuple(aggstate);
2349  if (!TupIsNull(outerslot))
2350  {
2351  /*
2352  * Make a copy of the first input tuple; we will use this
2353  * for comparisons (in group mode) and for projection.
2354  */
2355  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2356  }
2357  else
2358  {
2359  /* outer plan produced no tuples at all */
2360  if (hasGroupingSets)
2361  {
2362  /*
2363  * If there was no input at all, we need to project
2364  * rows only if there are grouping sets of size 0.
2365  * Note that this implies that there can't be any
2366  * references to ungrouped Vars, which would otherwise
2367  * cause issues with the empty output slot.
2368  *
2369  * XXX: This is no longer true, we currently deal with
2370  * this in finalize_aggregates().
2371  */
2372  aggstate->input_done = true;
2373 
2374  while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2375  {
2376  aggstate->projected_set += 1;
2377  if (aggstate->projected_set >= numGroupingSets)
2378  {
2379  /*
2380  * We can't set agg_done here because we might
2381  * have more phases to do, even though the
2382  * input is empty. So we need to restart the
2383  * whole outer loop.
2384  */
2385  break;
2386  }
2387  }
2388 
2389  if (aggstate->projected_set >= numGroupingSets)
2390  continue;
2391  }
2392  else
2393  {
2394  aggstate->agg_done = true;
2395  /* If we are grouping, we should produce no tuples too */
2396  if (node->aggstrategy != AGG_PLAIN)
2397  return NULL;
2398  }
2399  }
2400  }
2401 
2402  /*
2403  * Initialize working state for a new input tuple group.
2404  */
2405  initialize_aggregates(aggstate, pergroups, numReset);
2406 
2407  if (aggstate->grp_firstTuple != NULL)
2408  {
2409  /*
2410  * Store the copied first input tuple in the tuple table slot
2411  * reserved for it. The tuple will be deleted when it is
2412  * cleared from the slot.
2413  */
2415  firstSlot, true);
2416  aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2417 
2418  /* set up for first advance_aggregates call */
2419  tmpcontext->ecxt_outertuple = firstSlot;
2420 
2421  /*
2422  * Process each outer-plan tuple, and then fetch the next one,
2423  * until we exhaust the outer plan or cross a group boundary.
2424  */
2425  for (;;)
2426  {
2427  /*
2428  * During phase 1 only of a mixed agg, we need to update
2429  * hashtables as well in advance_aggregates.
2430  */
2431  if (aggstate->aggstrategy == AGG_MIXED &&
2432  aggstate->current_phase == 1)
2433  {
2434  lookup_hash_entries(aggstate);
2435  }
2436 
2437  /* Advance the aggregates (or combine functions) */
2438  advance_aggregates(aggstate);
2439 
2440  /* Reset per-input-tuple context after each tuple */
2441  ResetExprContext(tmpcontext);
2442 
2443  outerslot = fetch_input_tuple(aggstate);
2444  if (TupIsNull(outerslot))
2445  {
2446  /* no more outer-plan tuples available */
2447 
2448  /* if we built hash tables, finalize any spills */
2449  if (aggstate->aggstrategy == AGG_MIXED &&
2450  aggstate->current_phase == 1)
2452 
2453  if (hasGroupingSets)
2454  {
2455  aggstate->input_done = true;
2456  break;
2457  }
2458  else
2459  {
2460  aggstate->agg_done = true;
2461  break;
2462  }
2463  }
2464  /* set up for next advance_aggregates call */
2465  tmpcontext->ecxt_outertuple = outerslot;
2466 
2467  /*
2468  * If we are grouping, check whether we've crossed a group
2469  * boundary.
2470  */
2471  if (node->aggstrategy != AGG_PLAIN)
2472  {
2473  tmpcontext->ecxt_innertuple = firstSlot;
2474  if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2475  tmpcontext))
2476  {
2477  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2478  break;
2479  }
2480  }
2481  }
2482  }
2483 
2484  /*
2485  * Use the representative input tuple for any references to
2486  * non-aggregated input columns in aggregate direct args, the node
2487  * qual, and the tlist. (If we are not grouping, and there are no
2488  * input rows at all, we will come here with an empty firstSlot
2489  * ... but if not grouping, there can't be any references to
2490  * non-aggregated input columns, so no problem.)
2491  */
2492  econtext->ecxt_outertuple = firstSlot;
2493  }
2494 
2495  Assert(aggstate->projected_set >= 0);
2496 
2497  currentSet = aggstate->projected_set;
2498 
2499  prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2500 
2501  select_current_set(aggstate, currentSet, false);
2502 
2503  finalize_aggregates(aggstate,
2504  peragg,
2505  pergroups[currentSet]);
2506 
2507  /*
2508  * If there's no row to project right now, we must continue rather
2509  * than returning a null since there might be more groups.
2510  */
2511  result = project_aggregates(aggstate);
2512  if (result)
2513  return result;
2514  }
2515 
2516  /* No more groups */
2517  return NULL;
2518 }
2519 
2520 /*
2521  * ExecAgg for hashed case: read input and build hash table
2522  */
2523 static void
2525 {
2526  TupleTableSlot *outerslot;
2527  ExprContext *tmpcontext = aggstate->tmpcontext;
2528 
2529  /*
2530  * Process each outer-plan tuple, and then fetch the next one, until we
2531  * exhaust the outer plan.
2532  */
2533  for (;;)
2534  {
2535  outerslot = fetch_input_tuple(aggstate);
2536  if (TupIsNull(outerslot))
2537  break;
2538 
2539  /* set up for lookup_hash_entries and advance_aggregates */
2540  tmpcontext->ecxt_outertuple = outerslot;
2541 
2542  /* Find or build hashtable entries */
2543  lookup_hash_entries(aggstate);
2544 
2545  /* Advance the aggregates (or combine functions) */
2546  advance_aggregates(aggstate);
2547 
2548  /*
2549  * Reset per-input-tuple context after each tuple, but note that the
2550  * hash lookups do this too
2551  */
2552  ResetExprContext(aggstate->tmpcontext);
2553  }
2554 
2555  /* finalize spills, if any */
2557 
2558  aggstate->table_filled = true;
2559  /* Initialize to walk the first hash table */
2560  select_current_set(aggstate, 0, true);
2562  &aggstate->perhash[0].hashiter);
2563 }
2564 
2565 /*
2566  * If any data was spilled during hash aggregation, reset the hash table and
2567  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2568  * table will again contain data, ready to be consumed by
2569  * agg_retrieve_hash_table_in_memory().
2570  *
2571  * Should only be called after all in memory hash table entries have been
2572  * finalized and emitted.
2573  *
2574  * Return false when input is exhausted and there's no more work to be done;
2575  * otherwise return true.
2576  */
2577 static bool
2579 {
2580  HashAggBatch *batch;
2581  AggStatePerHash perhash;
2582  HashAggSpill spill;
2583  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2584  bool spill_initialized = false;
2585 
2586  if (aggstate->hash_batches == NIL)
2587  return false;
2588 
2589  batch = linitial(aggstate->hash_batches);
2590  aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
2591 
2592  hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2593  batch->used_bits, &aggstate->hash_mem_limit,
2594  &aggstate->hash_ngroups_limit, NULL);
2595 
2596  /* there could be residual pergroup pointers; clear them */
2597  for (int setoff = 0;
2598  setoff < aggstate->maxsets + aggstate->num_hashes;
2599  setoff++)
2600  aggstate->all_pergroups[setoff] = NULL;
2601 
2602  /* free memory and reset hash tables */
2603  ReScanExprContext(aggstate->hashcontext);
2604  for (int setno = 0; setno < aggstate->num_hashes; setno++)
2605  ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2606 
2607  aggstate->hash_ngroups_current = 0;
2608 
2609  /*
2610  * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2611  * happens in phase 0. So, we switch to phase 1 when processing a batch,
2612  * and back to phase 0 after the batch is done.
2613  */
2614  Assert(aggstate->current_phase == 0);
2615  if (aggstate->phase->aggstrategy == AGG_MIXED)
2616  {
2617  aggstate->current_phase = 1;
2618  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2619  }
2620 
2621  select_current_set(aggstate, batch->setno, true);
2622 
2623  perhash = &aggstate->perhash[aggstate->current_set];
2624 
2625  /*
2626  * Spilled tuples are always read back as MinimalTuples, which may be
2627  * different from the outer plan, so recompile the aggregate expressions.
2628  *
2629  * We still need the NULL check, because we are only processing one
2630  * grouping set at a time and the rest will be NULL.
2631  */
2632  hashagg_recompile_expressions(aggstate, true, true);
2633 
2634  for (;;)
2635  {
2636  TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2637  TupleTableSlot *hashslot = perhash->hashslot;
2638  TupleHashEntry entry;
2639  MinimalTuple tuple;
2640  uint32 hash;
2641  bool isnew = false;
2642  bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2643 
2645 
2646  tuple = hashagg_batch_read(batch, &hash);
2647  if (tuple == NULL)
2648  break;
2649 
2650  ExecStoreMinimalTuple(tuple, spillslot, true);
2651  aggstate->tmpcontext->ecxt_outertuple = spillslot;
2652 
2653  prepare_hash_slot(perhash,
2654  aggstate->tmpcontext->ecxt_outertuple,
2655  hashslot);
2656  entry = LookupTupleHashEntryHash(
2657  perhash->hashtable, hashslot, p_isnew, hash);
2658 
2659  if (entry != NULL)
2660  {
2661  if (isnew)
2662  initialize_hash_entry(aggstate, perhash->hashtable, entry);
2663  aggstate->hash_pergroup[batch->setno] = entry->additional;
2664  advance_aggregates(aggstate);
2665  }
2666  else
2667  {
2668  if (!spill_initialized)
2669  {
2670  /*
2671  * Avoid initializing the spill until we actually need it so
2672  * that we don't assign tapes that will never be used.
2673  */
2674  spill_initialized = true;
2675  hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2676  batch->input_card, aggstate->hashentrysize);
2677  }
2678  /* no memory for a new group, spill */
2679  hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
2680 
2681  aggstate->hash_pergroup[batch->setno] = NULL;
2682  }
2683 
2684  /*
2685  * Reset per-input-tuple context after each tuple, but note that the
2686  * hash lookups do this too
2687  */
2688  ResetExprContext(aggstate->tmpcontext);
2689  }
2690 
2691  hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2692 
2693  /* change back to phase 0 */
2694  aggstate->current_phase = 0;
2695  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2696 
2697  if (spill_initialized)
2698  {
2699  hashagg_spill_finish(aggstate, &spill, batch->setno);
2700  hash_agg_update_metrics(aggstate, true, spill.npartitions);
2701  }
2702  else
2703  hash_agg_update_metrics(aggstate, true, 0);
2704 
2705  aggstate->hash_spill_mode = false;
2706 
2707  /* prepare to walk the first hash table */
2708  select_current_set(aggstate, batch->setno, true);
2709  ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2710  &aggstate->perhash[batch->setno].hashiter);
2711 
2712  pfree(batch);
2713 
2714  return true;
2715 }
2716 
2717 /*
2718  * ExecAgg for hashed case: retrieving groups from hash table
2719  *
2720  * After exhausting in-memory tuples, also try refilling the hash table using
2721  * previously-spilled tuples. Only returns NULL after all in-memory and
2722  * spilled tuples are exhausted.
2723  */
2724 static TupleTableSlot *
2726 {
2727  TupleTableSlot *result = NULL;
2728 
2729  while (result == NULL)
2730  {
2731  result = agg_retrieve_hash_table_in_memory(aggstate);
2732  if (result == NULL)
2733  {
2734  if (!agg_refill_hash_table(aggstate))
2735  {
2736  aggstate->agg_done = true;
2737  break;
2738  }
2739  }
2740  }
2741 
2742  return result;
2743 }
2744 
2745 /*
2746  * Retrieve the groups from the in-memory hash tables without considering any
2747  * spilled tuples.
2748  */
2749 static TupleTableSlot *
2751 {
2752  ExprContext *econtext;
2753  AggStatePerAgg peragg;
2754  AggStatePerGroup pergroup;
2755  TupleHashEntryData *entry;
2756  TupleTableSlot *firstSlot;
2757  TupleTableSlot *result;
2758  AggStatePerHash perhash;
2759 
2760  /*
2761  * get state info from node.
2762  *
2763  * econtext is the per-output-tuple expression context.
2764  */
2765  econtext = aggstate->ss.ps.ps_ExprContext;
2766  peragg = aggstate->peragg;
2767  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2768 
2769  /*
2770  * Note that perhash (and therefore anything accessed through it) can
2771  * change inside the loop, as we change between grouping sets.
2772  */
2773  perhash = &aggstate->perhash[aggstate->current_set];
2774 
2775  /*
2776  * We loop retrieving groups until we find one satisfying
2777  * aggstate->ss.ps.qual
2778  */
2779  for (;;)
2780  {
2781  TupleTableSlot *hashslot = perhash->hashslot;
2782  int i;
2783 
2785 
2786  /*
2787  * Find the next entry in the hash table
2788  */
2789  entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2790  if (entry == NULL)
2791  {
2792  int nextset = aggstate->current_set + 1;
2793 
2794  if (nextset < aggstate->num_hashes)
2795  {
2796  /*
2797  * Switch to next grouping set, reinitialize, and restart the
2798  * loop.
2799  */
2800  select_current_set(aggstate, nextset, true);
2801 
2802  perhash = &aggstate->perhash[aggstate->current_set];
2803 
2804  ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2805 
2806  continue;
2807  }
2808  else
2809  {
2810  return NULL;
2811  }
2812  }
2813 
2814  /*
2815  * Clear the per-output-tuple context for each group
2816  *
2817  * We intentionally don't use ReScanExprContext here; if any aggs have
2818  * registered shutdown callbacks, they mustn't be called yet, since we
2819  * might not be done with that agg.
2820  */
2821  ResetExprContext(econtext);
2822 
2823  /*
2824  * Transform representative tuple back into one with the right
2825  * columns.
2826  */
2827  ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2828  slot_getallattrs(hashslot);
2829 
2830  ExecClearTuple(firstSlot);
2831  memset(firstSlot->tts_isnull, true,
2832  firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2833 
2834  for (i = 0; i < perhash->numhashGrpCols; i++)
2835  {
2836  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2837 
2838  firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2839  firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2840  }
2841  ExecStoreVirtualTuple(firstSlot);
2842 
2843  pergroup = (AggStatePerGroup) entry->additional;
2844 
2845  /*
2846  * Use the representative input tuple for any references to
2847  * non-aggregated input columns in the qual and tlist.
2848  */
2849  econtext->ecxt_outertuple = firstSlot;
2850 
2851  prepare_projection_slot(aggstate,
2852  econtext->ecxt_outertuple,
2853  aggstate->current_set);
2854 
2855  finalize_aggregates(aggstate, peragg, pergroup);
2856 
2857  result = project_aggregates(aggstate);
2858  if (result)
2859  return result;
2860  }
2861 
2862  /* No more groups */
2863  return NULL;
2864 }
2865 
2866 /*
2867  * Initialize HashTapeInfo
2868  */
2869 static void
2871 {
2872  HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2873  int init_tapes = 16; /* expanded dynamically */
2874 
2875  tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
2876  tapeinfo->ntapes = init_tapes;
2877  tapeinfo->nfreetapes = init_tapes;
2878  tapeinfo->freetapes_alloc = init_tapes;
2879  tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2880  for (int i = 0; i < init_tapes; i++)
2881  tapeinfo->freetapes[i] = i;
2882 
2883  aggstate->hash_tapeinfo = tapeinfo;
2884 }
2885 
2886 /*
2887  * Assign unused tapes to spill partitions, extending the tape set if
2888  * necessary.
2889  */
2890 static void
2892  int npartitions)
2893 {
2894  int partidx = 0;
2895 
2896  /* use free tapes if available */
2897  while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2898  partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2899 
2900  if (partidx < npartitions)
2901  {
2902  LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2903 
2904  while (partidx < npartitions)
2905  partitions[partidx++] = tapeinfo->ntapes++;
2906  }
2907 }
2908 
2909 /*
2910  * After a tape has already been written to and then read, this function
2911  * rewinds it for writing and adds it to the free list.
2912  */
2913 static void
2915 {
2916  /* rewinding frees the buffer while not in use */
2917  LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2918  if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2919  {
2920  tapeinfo->freetapes_alloc <<= 1;
2921  tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
2922  tapeinfo->freetapes_alloc * sizeof(int));
2923  }
2924  tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2925 }
2926 
2927 /*
2928  * hashagg_spill_init
2929  *
2930  * Called after we determined that spilling is necessary. Chooses the number
2931  * of partitions to create, and initializes them.
2932  */
2933 static void
2934 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2935  double input_groups, double hashentrysize)
2936 {
2937  int npartitions;
2938  int partition_bits;
2939 
2940  npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2941  used_bits, &partition_bits);
2942 
2943  spill->partitions = palloc0(sizeof(int) * npartitions);
2944  spill->ntuples = palloc0(sizeof(int64) * npartitions);
2945  spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
2946 
2947  hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2948 
2949  spill->tapeset = tapeinfo->tapeset;
2950  spill->shift = 32 - used_bits - partition_bits;
2951  spill->mask = (npartitions - 1) << spill->shift;
2952  spill->npartitions = npartitions;
2953 
2954  for (int i = 0; i < npartitions; i++)
2956 }
2957 
2958 /*
2959  * hashagg_spill_tuple
2960  *
2961  * No room for new groups in the hash table. Save for later in the appropriate
2962  * partition.
2963  */
2964 static Size
2966  TupleTableSlot *inputslot, uint32 hash)
2967 {
2968  LogicalTapeSet *tapeset = spill->tapeset;
2969  TupleTableSlot *spillslot;
2970  int partition;
2971  MinimalTuple tuple;
2972  int tapenum;
2973  int total_written = 0;
2974  bool shouldFree;
2975 
2976  Assert(spill->partitions != NULL);
2977 
2978  /* spill only attributes that we actually need */
2979  if (!aggstate->all_cols_needed)
2980  {
2981  spillslot = aggstate->hash_spill_wslot;
2982  slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
2983  ExecClearTuple(spillslot);
2984  for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
2985  {
2986  if (bms_is_member(i + 1, aggstate->colnos_needed))
2987  {
2988  spillslot->tts_values[i] = inputslot->tts_values[i];
2989  spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
2990  }
2991  else
2992  spillslot->tts_isnull[i] = true;
2993  }
2994  ExecStoreVirtualTuple(spillslot);
2995  }
2996  else
2997  spillslot = inputslot;
2998 
2999  tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
3000 
3001  partition = (hash & spill->mask) >> spill->shift;
3002  spill->ntuples[partition]++;
3003 
3004  /*
3005  * All hash values destined for a given partition have some bits in
3006  * common, which causes bad HLL cardinality estimates. Hash the hash to
3007  * get a more uniform distribution.
3008  */
3009  addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
3010 
3011  tapenum = spill->partitions[partition];
3012 
3013  LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
3014  total_written += sizeof(uint32);
3015 
3016  LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
3017  total_written += tuple->t_len;
3018 
3019  if (shouldFree)
3020  pfree(tuple);
3021 
3022  return total_written;
3023 }
3024 
3025 /*
3026  * hashagg_batch_new
3027  *
3028  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3029  * be done.
3030  */
3031 static HashAggBatch *
3032 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3033  int64 input_tuples, double input_card, int used_bits)
3034 {
3035  HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
3036 
3037  batch->setno = setno;
3038  batch->used_bits = used_bits;
3039  batch->tapeset = tapeset;
3040  batch->input_tapenum = tapenum;
3041  batch->input_tuples = input_tuples;
3042  batch->input_card = input_card;
3043 
3044  return batch;
3045 }
3046 
3047 /*
3048  * read_spilled_tuple
3049  * read the next tuple from a batch's tape. Return NULL if no more.
3050  */
3051 static MinimalTuple
3053 {
3054  LogicalTapeSet *tapeset = batch->tapeset;
3055  int tapenum = batch->input_tapenum;
3056  MinimalTuple tuple;
3057  uint32 t_len;
3058  size_t nread;
3059  uint32 hash;
3060 
3061  nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
3062  if (nread == 0)
3063  return NULL;
3064  if (nread != sizeof(uint32))
3065  ereport(ERROR,
3067  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3068  tapenum, sizeof(uint32), nread)));
3069  if (hashp != NULL)
3070  *hashp = hash;
3071 
3072  nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
3073  if (nread != sizeof(uint32))
3074  ereport(ERROR,
3076  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3077  tapenum, sizeof(uint32), nread)));
3078 
3079  tuple = (MinimalTuple) palloc(t_len);
3080  tuple->t_len = t_len;
3081 
3082  nread = LogicalTapeRead(tapeset, tapenum,
3083  (void *) ((char *) tuple + sizeof(uint32)),
3084  t_len - sizeof(uint32));
3085  if (nread != t_len - sizeof(uint32))
3086  ereport(ERROR,
3088  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3089  tapenum, t_len - sizeof(uint32), nread)));
3090 
3091  return tuple;
3092 }
3093 
3094 /*
3095  * hashagg_finish_initial_spills
3096  *
3097  * After a HashAggBatch has been processed, it may have spilled tuples to
3098  * disk. If so, turn the spilled partitions into new batches that must later
3099  * be executed.
3100  */
3101 static void
3103 {
3104  int setno;
3105  int total_npartitions = 0;
3106 
3107  if (aggstate->hash_spills != NULL)
3108  {
3109  for (setno = 0; setno < aggstate->num_hashes; setno++)
3110  {
3111  HashAggSpill *spill = &aggstate->hash_spills[setno];
3112 
3113  total_npartitions += spill->npartitions;
3114  hashagg_spill_finish(aggstate, spill, setno);
3115  }
3116 
3117  /*
3118  * We're not processing tuples from outer plan any more; only
3119  * processing batches of spilled tuples. The initial spill structures
3120  * are no longer needed.
3121  */
3122  pfree(aggstate->hash_spills);
3123  aggstate->hash_spills = NULL;
3124  }
3125 
3126  hash_agg_update_metrics(aggstate, false, total_npartitions);
3127  aggstate->hash_spill_mode = false;
3128 }
3129 
3130 /*
3131  * hashagg_spill_finish
3132  *
3133  * Transform spill partitions into new batches.
3134  */
3135 static void
3136 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3137 {
3138  int i;
3139  int used_bits = 32 - spill->shift;
3140 
3141  if (spill->npartitions == 0)
3142  return; /* didn't spill */
3143 
3144  for (i = 0; i < spill->npartitions; i++)
3145  {
3147  int tapenum = spill->partitions[i];
3148  HashAggBatch *new_batch;
3149  double cardinality;
3150 
3151  /* if the partition is empty, don't create a new batch of work */
3152  if (spill->ntuples[i] == 0)
3153  continue;
3154 
3155  cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3156  freeHyperLogLog(&spill->hll_card[i]);
3157 
3158  /* rewinding frees the buffer while not in use */
3159  LogicalTapeRewindForRead(tapeset, tapenum,
3161 
3162  new_batch = hashagg_batch_new(tapeset, tapenum, setno,
3163  spill->ntuples[i], cardinality,
3164  used_bits);
3165  aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
3166  aggstate->hash_batches_used++;
3167  }
3168 
3169  pfree(spill->ntuples);
3170  pfree(spill->hll_card);
3171  pfree(spill->partitions);
3172 }
3173 
3174 /*
3175  * Free resources related to a spilled HashAgg.
3176  */
3177 static void
3179 {
3180  ListCell *lc;
3181 
3182  /* free spills from initial pass */
3183  if (aggstate->hash_spills != NULL)
3184  {
3185  int setno;
3186 
3187  for (setno = 0; setno < aggstate->num_hashes; setno++)
3188  {
3189  HashAggSpill *spill = &aggstate->hash_spills[setno];
3190 
3191  pfree(spill->ntuples);
3192  pfree(spill->partitions);
3193  }
3194  pfree(aggstate->hash_spills);
3195  aggstate->hash_spills = NULL;
3196  }
3197 
3198  /* free batches */
3199  foreach(lc, aggstate->hash_batches)
3200  {
3201  HashAggBatch *batch = (HashAggBatch *) lfirst(lc);
3202 
3203  pfree(batch);
3204  }
3205  list_free(aggstate->hash_batches);
3206  aggstate->hash_batches = NIL;
3207 
3208  /* close tape set */
3209  if (aggstate->hash_tapeinfo != NULL)
3210  {
3211  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3212 
3213  LogicalTapeSetClose(tapeinfo->tapeset);
3214  pfree(tapeinfo->freetapes);
3215  pfree(tapeinfo);
3216  aggstate->hash_tapeinfo = NULL;
3217  }
3218 }
3219 
3220 
3221 /* -----------------
3222  * ExecInitAgg
3223  *
3224  * Creates the run-time information for the agg node produced by the
3225  * planner and initializes its outer subtree.
3226  *
3227  * -----------------
3228  */
3229 AggState *
3230 ExecInitAgg(Agg *node, EState *estate, int eflags)
3231 {
3232  AggState *aggstate;
3233  AggStatePerAgg peraggs;
3234  AggStatePerTrans pertransstates;
3235  AggStatePerGroup *pergroups;
3236  Plan *outerPlan;
3237  ExprContext *econtext;
3238  TupleDesc scanDesc;
3239  int max_aggno;
3240  int max_transno;
3241  int numaggrefs;
3242  int numaggs;
3243  int numtrans;
3244  int phase;
3245  int phaseidx;
3246  ListCell *l;
3247  Bitmapset *all_grouped_cols = NULL;
3248  int numGroupingSets = 1;
3249  int numPhases;
3250  int numHashes;
3251  int i = 0;
3252  int j = 0;
3253  bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3254  node->aggstrategy == AGG_MIXED);
3255 
3256  /* check for unsupported flags */
3257  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3258 
3259  /*
3260  * create state structure
3261  */
3262  aggstate = makeNode(AggState);
3263  aggstate->ss.ps.plan = (Plan *) node;
3264  aggstate->ss.ps.state = estate;
3265  aggstate->ss.ps.ExecProcNode = ExecAgg;
3266 
3267  aggstate->aggs = NIL;
3268  aggstate->numaggs = 0;
3269  aggstate->numtrans = 0;
3270  aggstate->aggstrategy = node->aggstrategy;
3271  aggstate->aggsplit = node->aggsplit;
3272  aggstate->maxsets = 0;
3273  aggstate->projected_set = -1;
3274  aggstate->current_set = 0;
3275  aggstate->peragg = NULL;
3276  aggstate->pertrans = NULL;
3277  aggstate->curperagg = NULL;
3278  aggstate->curpertrans = NULL;
3279  aggstate->input_done = false;
3280  aggstate->agg_done = false;
3281  aggstate->pergroups = NULL;
3282  aggstate->grp_firstTuple = NULL;
3283  aggstate->sort_in = NULL;
3284  aggstate->sort_out = NULL;
3285 
3286  /*
3287  * phases[0] always exists, but is dummy in sorted/plain mode
3288  */
3289  numPhases = (use_hashing ? 1 : 2);
3290  numHashes = (use_hashing ? 1 : 0);
3291 
3292  /*
3293  * Calculate the maximum number of grouping sets in any phase; this
3294  * determines the size of some allocations. Also calculate the number of
3295  * phases, since all hashed/mixed nodes contribute to only a single phase.
3296  */
3297  if (node->groupingSets)
3298  {
3299  numGroupingSets = list_length(node->groupingSets);
3300 
3301  foreach(l, node->chain)
3302  {
3303  Agg *agg = lfirst(l);
3304 
3305  numGroupingSets = Max(numGroupingSets,
3306  list_length(agg->groupingSets));
3307 
3308  /*
3309  * additional AGG_HASHED aggs become part of phase 0, but all
3310  * others add an extra phase.
3311  */
3312  if (agg->aggstrategy != AGG_HASHED)
3313  ++numPhases;
3314  else
3315  ++numHashes;
3316  }
3317  }
3318 
3319  aggstate->maxsets = numGroupingSets;
3320  aggstate->numphases = numPhases;
3321 
3322  aggstate->aggcontexts = (ExprContext **)
3323  palloc0(sizeof(ExprContext *) * numGroupingSets);
3324 
3325  /*
3326  * Create expression contexts. We need three or more, one for
3327  * per-input-tuple processing, one for per-output-tuple processing, one
3328  * for all the hashtables, and one for each grouping set. The per-tuple
3329  * memory context of the per-grouping-set ExprContexts (aggcontexts)
3330  * replaces the standalone memory context formerly used to hold transition
3331  * values. We cheat a little by using ExecAssignExprContext() to build
3332  * all of them.
3333  *
3334  * NOTE: the details of what is stored in aggcontexts and what is stored
3335  * in the regular per-query memory context are driven by a simple
3336  * decision: we want to reset the aggcontext at group boundaries (if not
3337  * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3338  */
3339  ExecAssignExprContext(estate, &aggstate->ss.ps);
3340  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3341 
3342  for (i = 0; i < numGroupingSets; ++i)
3343  {
3344  ExecAssignExprContext(estate, &aggstate->ss.ps);
3345  aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3346  }
3347 
3348  if (use_hashing)
3349  aggstate->hashcontext = CreateWorkExprContext(estate);
3350 
3351  ExecAssignExprContext(estate, &aggstate->ss.ps);
3352 
3353  /*
3354  * Initialize child nodes.
3355  *
3356  * If we are doing a hashed aggregation then the child plan does not need
3357  * to handle REWIND efficiently; see ExecReScanAgg.
3358  */
3359  if (node->aggstrategy == AGG_HASHED)
3360  eflags &= ~EXEC_FLAG_REWIND;
3361  outerPlan = outerPlan(node);
3362  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3363 
3364  /*
3365  * initialize source tuple type.
3366  */
3367  aggstate->ss.ps.outerops =
3369  &aggstate->ss.ps.outeropsfixed);
3370  aggstate->ss.ps.outeropsset = true;
3371 
3372  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3373  aggstate->ss.ps.outerops);
3374  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3375 
3376  /*
3377  * If there are more than two phases (including a potential dummy phase
3378  * 0), input will be resorted using tuplesort. Need a slot for that.
3379  */
3380  if (numPhases > 2)
3381  {
3382  aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3384 
3385  /*
3386  * The output of the tuplesort, and the output from the outer child
3387  * might not use the same type of slot. In most cases the child will
3388  * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3389  * input can also be presorted due an index, in which case it could be
3390  * a different type of slot.
3391  *
3392  * XXX: For efficiency it would be good to instead/additionally
3393  * generate expressions with corresponding settings of outerops* for
3394  * the individual phases - deforming is often a bottleneck for
3395  * aggregations with lots of rows per group. If there's multiple
3396  * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3397  * the nodeAgg.c internal tuplesort).
3398  */
3399  if (aggstate->ss.ps.outeropsfixed &&
3400  aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3401  aggstate->ss.ps.outeropsfixed = false;
3402  }
3403 
3404  /*
3405  * Initialize result type, slot and projection.
3406  */
3408  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3409 
3410  /*
3411  * initialize child expressions
3412  *
3413  * We expect the parser to have checked that no aggs contain other agg
3414  * calls in their arguments (and just to be sure, we verify it again while
3415  * initializing the plan node). This would make no sense under SQL
3416  * semantics, and it's forbidden by the spec. Because it is true, we
3417  * don't need to worry about evaluating the aggs in any particular order.
3418  *
3419  * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
3420  * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
3421  * during ExecAssignProjectionInfo, above.
3422  */
3423  aggstate->ss.ps.qual =
3424  ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3425 
3426  /*
3427  * We should now have found all Aggrefs in the targetlist and quals.
3428  */
3429  numaggrefs = list_length(aggstate->aggs);
3430  max_aggno = -1;
3431  max_transno = -1;
3432  foreach(l, aggstate->aggs)
3433  {
3434  Aggref *aggref = (Aggref *) lfirst(l);
3435 
3436  max_aggno = Max(max_aggno, aggref->aggno);
3437  max_transno = Max(max_transno, aggref->aggtransno);
3438  }
3439  numaggs = max_aggno + 1;
3440  numtrans = max_transno + 1;
3441 
3442  /*
3443  * For each phase, prepare grouping set data and fmgr lookup data for
3444  * compare functions. Accumulate all_grouped_cols in passing.
3445  */
3446  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3447 
3448  aggstate->num_hashes = numHashes;
3449  if (numHashes)
3450  {
3451  aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3452  aggstate->phases[0].numsets = 0;
3453  aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3454  aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3455  }
3456 
3457  phase = 0;
3458  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3459  {
3460  Agg *aggnode;
3461  Sort *sortnode;
3462 
3463  if (phaseidx > 0)
3464  {
3465  aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3466  sortnode = castNode(Sort, aggnode->plan.lefttree);
3467  }
3468  else
3469  {
3470  aggnode = node;
3471  sortnode = NULL;
3472  }
3473 
3474  Assert(phase <= 1 || sortnode);
3475 
3476  if (aggnode->aggstrategy == AGG_HASHED
3477  || aggnode->aggstrategy == AGG_MIXED)
3478  {
3479  AggStatePerPhase phasedata = &aggstate->phases[0];
3480  AggStatePerHash perhash;
3481  Bitmapset *cols = NULL;
3482 
3483  Assert(phase == 0);
3484  i = phasedata->numsets++;
3485  perhash = &aggstate->perhash[i];
3486 
3487  /* phase 0 always points to the "real" Agg in the hash case */
3488  phasedata->aggnode = node;
3489  phasedata->aggstrategy = node->aggstrategy;
3490 
3491  /* but the actual Agg node representing this hash is saved here */
3492  perhash->aggnode = aggnode;
3493 
3494  phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3495 
3496  for (j = 0; j < aggnode->numCols; ++j)
3497  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3498 
3499  phasedata->grouped_cols[i] = cols;
3500 
3501  all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3502  continue;
3503  }
3504  else
3505  {
3506  AggStatePerPhase phasedata = &aggstate->phases[++phase];
3507  int num_sets;
3508 
3509  phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3510 
3511  if (num_sets)
3512  {
3513  phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3514  phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3515 
3516  i = 0;
3517  foreach(l, aggnode->groupingSets)
3518  {
3519  int current_length = list_length(lfirst(l));
3520  Bitmapset *cols = NULL;
3521 
3522  /* planner forces this to be correct */
3523  for (j = 0; j < current_length; ++j)
3524  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3525 
3526  phasedata->grouped_cols[i] = cols;
3527  phasedata->gset_lengths[i] = current_length;
3528 
3529  ++i;
3530  }
3531 
3532  all_grouped_cols = bms_add_members(all_grouped_cols,
3533  phasedata->grouped_cols[0]);
3534  }
3535  else
3536  {
3537  Assert(phaseidx == 0);
3538 
3539  phasedata->gset_lengths = NULL;
3540  phasedata->grouped_cols = NULL;
3541  }
3542 
3543  /*
3544  * If we are grouping, precompute fmgr lookup data for inner loop.
3545  */
3546  if (aggnode->aggstrategy == AGG_SORTED)
3547  {
3548  int i = 0;
3549 
3550  Assert(aggnode->numCols > 0);
3551 
3552  /*
3553  * Build a separate function for each subset of columns that
3554  * need to be compared.
3555  */
3556  phasedata->eqfunctions =
3557  (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3558 
3559  /* for each grouping set */
3560  for (i = 0; i < phasedata->numsets; i++)
3561  {
3562  int length = phasedata->gset_lengths[i];
3563 
3564  if (phasedata->eqfunctions[length - 1] != NULL)
3565  continue;
3566 
3567  phasedata->eqfunctions[length - 1] =
3568  execTuplesMatchPrepare(scanDesc,
3569  length,
3570  aggnode->grpColIdx,
3571  aggnode->grpOperators,
3572  aggnode->grpCollations,
3573  (PlanState *) aggstate);
3574  }
3575 
3576  /* and for all grouped columns, unless already computed */
3577  if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3578  {
3579  phasedata->eqfunctions[aggnode->numCols - 1] =
3580  execTuplesMatchPrepare(scanDesc,
3581  aggnode->numCols,
3582  aggnode->grpColIdx,
3583  aggnode->grpOperators,
3584  aggnode->grpCollations,
3585  (PlanState *) aggstate);
3586  }
3587  }
3588 
3589  phasedata->aggnode = aggnode;
3590  phasedata->aggstrategy = aggnode->aggstrategy;
3591  phasedata->sortnode = sortnode;
3592  }
3593  }
3594 
3595  /*
3596  * Convert all_grouped_cols to a descending-order list.
3597  */
3598  i = -1;
3599  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3600  aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3601 
3602  /*
3603  * Set up aggregate-result storage in the output expr context, and also
3604  * allocate my private per-agg working storage
3605  */
3606  econtext = aggstate->ss.ps.ps_ExprContext;
3607  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3608  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3609 
3610  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3611  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
3612 
3613  aggstate->peragg = peraggs;
3614  aggstate->pertrans = pertransstates;
3615 
3616 
3617  aggstate->all_pergroups =
3619  * (numGroupingSets + numHashes));
3620  pergroups = aggstate->all_pergroups;
3621 
3622  if (node->aggstrategy != AGG_HASHED)
3623  {
3624  for (i = 0; i < numGroupingSets; i++)
3625  {
3626  pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3627  * numaggs);
3628  }
3629 
3630  aggstate->pergroups = pergroups;
3631  pergroups += numGroupingSets;
3632  }
3633 
3634  /*
3635  * Hashing can only appear in the initial phase.
3636  */
3637  if (use_hashing)
3638  {
3639  Plan *outerplan = outerPlan(node);
3640  uint64 totalGroups = 0;
3641  int i;
3642 
3643  aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3644  "HashAgg meta context",
3646  aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3648  aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3649  &TTSOpsVirtual);
3650 
3651  /* this is an array of pointers, not structures */
3652  aggstate->hash_pergroup = pergroups;
3653 
3654  aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3655  outerplan->plan_width,
3656  node->transitionSpace);
3657 
3658  /*
3659  * Consider all of the grouping sets together when setting the limits
3660  * and estimating the number of partitions. This can be inaccurate
3661  * when there is more than one grouping set, but should still be
3662  * reasonable.
3663  */
3664  for (i = 0; i < aggstate->num_hashes; i++)
3665  totalGroups += aggstate->perhash[i].aggnode->numGroups;
3666 
3667  hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3668  &aggstate->hash_mem_limit,
3669  &aggstate->hash_ngroups_limit,
3670  &aggstate->hash_planned_partitions);
3671  find_hash_columns(aggstate);
3672 
3673  /* Skip massive memory allocation if we are just doing EXPLAIN */
3674  if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3675  build_hash_tables(aggstate);
3676 
3677  aggstate->table_filled = false;
3678 
3679  /* Initialize this to 1, meaning nothing spilled, yet */
3680  aggstate->hash_batches_used = 1;
3681  }
3682 
3683  /*
3684  * Initialize current phase-dependent values to initial phase. The initial
3685  * phase is 1 (first sort pass) for all strategies that use sorting (if
3686  * hashing is being done too, then phase 0 is processed last); but if only
3687  * hashing is being done, then phase 0 is all there is.
3688  */
3689  if (node->aggstrategy == AGG_HASHED)
3690  {
3691  aggstate->current_phase = 0;
3692  initialize_phase(aggstate, 0);
3693  select_current_set(aggstate, 0, true);
3694  }
3695  else
3696  {
3697  aggstate->current_phase = 1;
3698  initialize_phase(aggstate, 1);
3699  select_current_set(aggstate, 0, false);
3700  }
3701 
3702  /*
3703  * Perform lookups of aggregate function info, and initialize the
3704  * unchanging fields of the per-agg and per-trans data.
3705  */
3706  foreach(l, aggstate->aggs)
3707  {
3708  Aggref *aggref = lfirst(l);
3709  AggStatePerAgg peragg;
3710  AggStatePerTrans pertrans;
3711  Oid inputTypes[FUNC_MAX_ARGS];
3712  int numArguments;
3713  int numDirectArgs;
3714  HeapTuple aggTuple;
3715  Form_pg_aggregate aggform;
3716  AclResult aclresult;
3717  Oid finalfn_oid;
3718  Oid serialfn_oid,
3719  deserialfn_oid;
3720  Oid aggOwner;
3721  Expr *finalfnexpr;
3722  Oid aggtranstype;
3723 
3724  /* Planner should have assigned aggregate to correct level */
3725  Assert(aggref->agglevelsup == 0);
3726  /* ... and the split mode should match */
3727  Assert(aggref->aggsplit == aggstate->aggsplit);
3728 
3729  peragg = &peraggs[aggref->aggno];
3730 
3731  /* Check if we initialized the state for this aggregate already. */
3732  if (peragg->aggref != NULL)
3733  continue;
3734 
3735  peragg->aggref = aggref;
3736  peragg->transno = aggref->aggtransno;
3737 
3738  /* Fetch the pg_aggregate row */
3739  aggTuple = SearchSysCache1(AGGFNOID,
3740  ObjectIdGetDatum(aggref->aggfnoid));
3741  if (!HeapTupleIsValid(aggTuple))
3742  elog(ERROR, "cache lookup failed for aggregate %u",
3743  aggref->aggfnoid);
3744  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3745 
3746  /* Check permission to call aggregate function */
3747  aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3748  ACL_EXECUTE);
3749  if (aclresult != ACLCHECK_OK)
3750  aclcheck_error(aclresult, OBJECT_AGGREGATE,
3751  get_func_name(aggref->aggfnoid));
3753 
3754  /* planner recorded transition state type in the Aggref itself */
3755  aggtranstype = aggref->aggtranstype;
3756  Assert(OidIsValid(aggtranstype));
3757 
3758  /* Final function only required if we're finalizing the aggregates */
3759  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3760  peragg->finalfn_oid = finalfn_oid = InvalidOid;
3761  else
3762  peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3763 
3764  serialfn_oid = InvalidOid;
3765  deserialfn_oid = InvalidOid;
3766 
3767  /*
3768  * Check if serialization/deserialization is required. We only do it
3769  * for aggregates that have transtype INTERNAL.
3770  */
3771  if (aggtranstype == INTERNALOID)
3772  {
3773  /*
3774  * The planner should only have generated a serialize agg node if
3775  * every aggregate with an INTERNAL state has a serialization
3776  * function. Verify that.
3777  */
3778  if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3779  {
3780  /* serialization only valid when not running finalfn */
3781  Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3782 
3783  if (!OidIsValid(aggform->aggserialfn))
3784  elog(ERROR, "serialfunc not provided for serialization aggregation");
3785  serialfn_oid = aggform->aggserialfn;
3786  }
3787 
3788  /* Likewise for deserialization functions */
3789  if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3790  {
3791  /* deserialization only valid when combining states */
3792  Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3793 
3794  if (!OidIsValid(aggform->aggdeserialfn))
3795  elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3796  deserialfn_oid = aggform->aggdeserialfn;
3797  }
3798  }
3799 
3800  /* Check that aggregate owner has permission to call component fns */
3801  {
3802  HeapTuple procTuple;
3803 
3804  procTuple = SearchSysCache1(PROCOID,
3805  ObjectIdGetDatum(aggref->aggfnoid));
3806  if (!HeapTupleIsValid(procTuple))
3807  elog(ERROR, "cache lookup failed for function %u",
3808  aggref->aggfnoid);
3809  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3810  ReleaseSysCache(procTuple);
3811 
3812  if (OidIsValid(finalfn_oid))
3813  {
3814  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3815  ACL_EXECUTE);
3816  if (aclresult != ACLCHECK_OK)
3817  aclcheck_error(aclresult, OBJECT_FUNCTION,
3818  get_func_name(finalfn_oid));
3819  InvokeFunctionExecuteHook(finalfn_oid);
3820  }
3821  if (OidIsValid(serialfn_oid))
3822  {
3823  aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3824  ACL_EXECUTE);
3825  if (aclresult != ACLCHECK_OK)
3826  aclcheck_error(aclresult, OBJECT_FUNCTION,
3827  get_func_name(serialfn_oid));
3828  InvokeFunctionExecuteHook(serialfn_oid);
3829  }
3830  if (OidIsValid(deserialfn_oid))
3831  {
3832  aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3833  ACL_EXECUTE);
3834  if (aclresult != ACLCHECK_OK)
3835  aclcheck_error(aclresult, OBJECT_FUNCTION,
3836  get_func_name(deserialfn_oid));
3837  InvokeFunctionExecuteHook(deserialfn_oid);
3838  }
3839  }
3840 
3841  /*
3842  * Get actual datatypes of the (nominal) aggregate inputs. These
3843  * could be different from the agg's declared input types, when the
3844  * agg accepts ANY or a polymorphic type.
3845  */
3846  numArguments = get_aggregate_argtypes(aggref, inputTypes);
3847 
3848  /* Count the "direct" arguments, if any */
3849  numDirectArgs = list_length(aggref->aggdirectargs);
3850 
3851  /* Detect how many arguments to pass to the finalfn */
3852  if (aggform->aggfinalextra)
3853  peragg->numFinalArgs = numArguments + 1;
3854  else
3855  peragg->numFinalArgs = numDirectArgs + 1;
3856 
3857  /* Initialize any direct-argument expressions */
3858  peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3859  (PlanState *) aggstate);
3860 
3861  /*
3862  * build expression trees using actual argument & result types for the
3863  * finalfn, if it exists and is required.
3864  */
3865  if (OidIsValid(finalfn_oid))
3866  {
3867  build_aggregate_finalfn_expr(inputTypes,
3868  peragg->numFinalArgs,
3869  aggtranstype,
3870  aggref->aggtype,
3871  aggref->inputcollid,
3872  finalfn_oid,
3873  &finalfnexpr);
3874  fmgr_info(finalfn_oid, &peragg->finalfn);
3875  fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3876  }
3877 
3878  /* get info about the output value's datatype */
3879  get_typlenbyval(aggref->aggtype,
3880  &peragg->resulttypeLen,
3881  &peragg->resulttypeByVal);
3882 
3883  /*
3884  * Build working state for invoking the transition function, if we
3885  * haven't done it already.
3886  */
3887  pertrans = &pertransstates[aggref->aggtransno];
3888  if (pertrans->aggref == NULL)
3889  {
3890  Datum textInitVal;
3891  Datum initValue;
3892  bool initValueIsNull;
3893  Oid transfn_oid;
3894 
3895  /*
3896  * If this aggregation is performing state combines, then instead
3897  * of using the transition function, we'll use the combine
3898  * function
3899  */
3900  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3901  {
3902  transfn_oid = aggform->aggcombinefn;
3903 
3904  /* If not set then the planner messed up */
3905  if (!OidIsValid(transfn_oid))
3906  elog(ERROR, "combinefn not set for aggregate function");
3907  }
3908  else
3909  transfn_oid = aggform->aggtransfn;
3910 
3911  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3912  ACL_EXECUTE);
3913  if (aclresult != ACLCHECK_OK)
3914  aclcheck_error(aclresult, OBJECT_FUNCTION,
3915  get_func_name(transfn_oid));
3916  InvokeFunctionExecuteHook(transfn_oid);
3917 
3918  /*
3919  * initval is potentially null, so don't try to access it as a
3920  * struct field. Must do it the hard way with SysCacheGetAttr.
3921  */
3922  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3923  Anum_pg_aggregate_agginitval,
3924  &initValueIsNull);
3925  if (initValueIsNull)
3926  initValue = (Datum) 0;
3927  else
3928  initValue = GetAggInitVal(textInitVal, aggtranstype);
3929 
3930  build_pertrans_for_aggref(pertrans, aggstate, estate,
3931  aggref, transfn_oid, aggtranstype,
3932  serialfn_oid, deserialfn_oid,
3933  initValue, initValueIsNull,
3934  inputTypes, numArguments);
3935  }
3936  else
3937  pertrans->aggshared = true;
3938  ReleaseSysCache(aggTuple);
3939  }
3940 
3941  /*
3942  * Update aggstate->numaggs to be the number of unique aggregates found.
3943  * Also set numstates to the number of unique transition states found.
3944  */
3945  aggstate->numaggs = numaggs;
3946  aggstate->numtrans = numtrans;
3947 
3948  /*
3949  * Last, check whether any more aggregates got added onto the node while
3950  * we processed the expressions for the aggregate arguments (including not
3951  * only the regular arguments and FILTER expressions handled immediately
3952  * above, but any direct arguments we might've handled earlier). If so,
3953  * we have nested aggregate functions, which is semantically nonsensical,
3954  * so complain. (This should have been caught by the parser, so we don't
3955  * need to work hard on a helpful error message; but we defend against it
3956  * here anyway, just to be sure.)
3957  */
3958  if (numaggrefs != list_length(aggstate->aggs))
3959  ereport(ERROR,
3960  (errcode(ERRCODE_GROUPING_ERROR),
3961  errmsg("aggregate function calls cannot be nested")));
3962 
3963  /*
3964  * Build expressions doing all the transition work at once. We build a
3965  * different one for each phase, as the number of transition function
3966  * invocation can differ between phases. Note this'll work both for
3967  * transition and combination functions (although there'll only be one
3968  * phase in the latter case).
3969  */
3970  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3971  {
3972  AggStatePerPhase phase = &aggstate->phases[phaseidx];
3973  bool dohash = false;
3974  bool dosort = false;
3975 
3976  /* phase 0 doesn't necessarily exist */
3977  if (!phase->aggnode)
3978  continue;
3979 
3980  if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3981  {
3982  /*
3983  * Phase one, and only phase one, in a mixed agg performs both
3984  * sorting and aggregation.
3985  */
3986  dohash = true;
3987  dosort = true;
3988  }
3989  else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
3990  {
3991  /*
3992  * No need to compute a transition function for an AGG_MIXED phase
3993  * 0 - the contents of the hashtables will have been computed
3994  * during phase 1.
3995  */
3996  continue;
3997  }
3998  else if (phase->aggstrategy == AGG_PLAIN ||
3999  phase->aggstrategy == AGG_SORTED)
4000  {
4001  dohash = false;
4002  dosort = true;
4003  }
4004  else if (phase->aggstrategy == AGG_HASHED)
4005  {
4006  dohash = true;
4007  dosort = false;
4008  }
4009  else
4010  Assert(false);
4011 
4012  phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4013  false);
4014 
4015  /* cache compiled expression for outer slot without NULL check */
4016  phase->evaltrans_cache[0][0] = phase->evaltrans;
4017  }
4018 
4019  return aggstate;
4020 }
4021 
4022 /*
4023  * Build the state needed to calculate a state value for an aggregate.
4024  *
4025  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4026  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
4027  * of the arguments could be calculated from 'aggref', but the caller has
4028  * calculated them already, so might as well pass them.
4029  */
4030 static void
4032  AggState *aggstate, EState *estate,
4033  Aggref *aggref,
4034  Oid aggtransfn, Oid aggtranstype,
4035  Oid aggserialfn, Oid aggdeserialfn,
4036  Datum initValue, bool initValueIsNull,
4037  Oid *inputTypes, int numArguments)
4038 {
4039  int numGroupingSets = Max(aggstate->maxsets, 1);
4040  Expr *serialfnexpr = NULL;
4041  Expr *deserialfnexpr = NULL;
4042  ListCell *lc;
4043  int numInputs;
4044  int numDirectArgs;
4045  List *sortlist;
4046  int numSortCols;
4047  int numDistinctCols;
4048  int i;
4049 
4050  /* Begin filling in the pertrans data */
4051  pertrans->aggref = aggref;
4052  pertrans->aggshared = false;
4053  pertrans->aggCollation = aggref->inputcollid;
4054  pertrans->transfn_oid = aggtransfn;
4055  pertrans->serialfn_oid = aggserialfn;
4056  pertrans->deserialfn_oid = aggdeserialfn;
4057  pertrans->initValue = initValue;
4058  pertrans->initValueIsNull = initValueIsNull;
4059 
4060  /* Count the "direct" arguments, if any */
4061  numDirectArgs = list_length(aggref->aggdirectargs);
4062 
4063  /* Count the number of aggregated input columns */
4064  pertrans->numInputs = numInputs = list_length(aggref->args);
4065 
4066  pertrans->aggtranstype = aggtranstype;
4067 
4068  /*
4069  * When combining states, we have no use at all for the aggregate
4070  * function's transfn. Instead we use the combinefn. In this case, the
4071  * transfn and transfn_oid fields of pertrans refer to the combine
4072  * function rather than the transition function.
4073  */
4074  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4075  {
4076  Expr *combinefnexpr;
4077  size_t numTransArgs;
4078 
4079  /*
4080  * When combining there's only one input, the to-be-combined added
4081  * transition value from below (this node's transition value is
4082  * counted separately).
4083  */
4084  pertrans->numTransInputs = 1;
4085 
4086  /* account for the current transition state */
4087  numTransArgs = pertrans->numTransInputs + 1;
4088 
4089  build_aggregate_combinefn_expr(aggtranstype,
4090  aggref->inputcollid,
4091  aggtransfn,
4092  &combinefnexpr);
4093  fmgr_info(aggtransfn, &pertrans->transfn);
4094  fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4095 
4096  pertrans->transfn_fcinfo =
4099  &pertrans->transfn,
4100  numTransArgs,
4101  pertrans->aggCollation,
4102  (void *) aggstate, NULL);
4103 
4104  /*
4105  * Ensure that a combine function to combine INTERNAL states is not
4106  * strict. This should have been checked during CREATE AGGREGATE, but
4107  * the strict property could have been changed since then.
4108  */
4109  if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4110  ereport(ERROR,
4111  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4112  errmsg("combine function with transition type %s must not be declared STRICT",
4113  format_type_be(aggtranstype))));
4114  }
4115  else
4116  {
4117  Expr *transfnexpr;
4118  size_t numTransArgs;
4119 
4120  /* Detect how many arguments to pass to the transfn */
4121  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4122  pertrans->numTransInputs = numInputs;
4123  else
4124  pertrans->numTransInputs = numArguments;
4125 
4126  /* account for the current transition state */
4127  numTransArgs = pertrans->numTransInputs + 1;
4128 
4129  /*
4130  * Set up infrastructure for calling the transfn. Note that
4131  * invtransfn is not needed here.
4132  */
4133  build_aggregate_transfn_expr(inputTypes,
4134  numArguments,
4135  numDirectArgs,
4136  aggref->aggvariadic,
4137  aggtranstype,
4138  aggref->inputcollid,
4139  aggtransfn,
4140  InvalidOid,
4141  &transfnexpr,
4142  NULL);
4143  fmgr_info(aggtransfn, &pertrans->transfn);
4144  fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4145 
4146  pertrans->transfn_fcinfo =
4149  &pertrans->transfn,
4150  numTransArgs,
4151  pertrans->aggCollation,
4152  (void *) aggstate, NULL);
4153 
4154  /*
4155  * If the transfn is strict and the initval is NULL, make sure input
4156  * type and transtype are the same (or at least binary-compatible), so
4157  * that it's OK to use the first aggregated input value as the initial
4158  * transValue. This should have been checked at agg definition time,
4159  * but we must check again in case the transfn's strictness property
4160  * has been changed.
4161  */
4162  if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4163  {
4164  if (numArguments <= numDirectArgs ||
4165  !IsBinaryCoercible(inputTypes[numDirectArgs],
4166  aggtranstype))
4167  ereport(ERROR,
4168  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4169  errmsg("aggregate %u needs to have compatible input type and transition type",
4170  aggref->aggfnoid)));
4171  }
4172  }
4173 
4174  /* get info about the state value's datatype */
4175  get_typlenbyval(aggtranstype,
4176  &pertrans->transtypeLen,
4177  &pertrans->transtypeByVal);
4178 
4179  if (OidIsValid(aggserialfn))
4180  {
4181  build_aggregate_serialfn_expr(aggserialfn,
4182  &serialfnexpr);
4183  fmgr_info(aggserialfn, &pertrans->serialfn);
4184  fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4185 
4186  pertrans->serialfn_fcinfo =
4189  &pertrans->serialfn,
4190  1,
4191  InvalidOid,
4192  (void *) aggstate, NULL);
4193  }
4194 
4195  if (OidIsValid(aggdeserialfn))
4196  {
4197  build_aggregate_deserialfn_expr(aggdeserialfn,
4198  &deserialfnexpr);
4199  fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4200  fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4201 
4202  pertrans->deserialfn_fcinfo =
4205  &pertrans->deserialfn,
4206  2,
4207  InvalidOid,
4208  (void *) aggstate, NULL);
4209 
4210  }
4211 
4212  /*
4213  * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4214  * have a list of SortGroupClause nodes; fish out the data in them and
4215  * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4216  * however; the agg's transfn and finalfn are responsible for that.
4217  *
4218  * Note that by construction, if there is a DISTINCT clause then the ORDER
4219  * BY clause is a prefix of it (see transformDistinctClause).
4220  */
4221  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4222  {
4223  sortlist = NIL;
4224  numSortCols = numDistinctCols = 0;
4225  }
4226  else if (aggref->aggdistinct)
4227  {
4228  sortlist = aggref->aggdistinct;
4229  numSortCols = numDistinctCols = list_length(sortlist);
4230  Assert(numSortCols >= list_length(aggref->aggorder));
4231  }
4232  else
4233  {
4234  sortlist = aggref->aggorder;
4235  numSortCols = list_length(sortlist);
4236  numDistinctCols = 0;
4237  }
4238 
4239  pertrans->numSortCols = numSortCols;
4240  pertrans->numDistinctCols = numDistinctCols;
4241 
4242  /*
4243  * If we have either sorting or filtering to do, create a tupledesc and
4244  * slot corresponding to the aggregated inputs (including sort
4245  * expressions) of the agg.
4246  */
4247  if (numSortCols > 0 || aggref->aggfilter)
4248  {
4249  pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4250  pertrans->sortslot =
4251  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4253  }
4254 
4255  if (numSortCols > 0)
4256  {
4257  /*
4258  * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4259  * (yet)
4260  */
4261  Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4262 
4263  /* If we have only one input, we need its len/byval info. */
4264  if (numInputs == 1)
4265  {
4266  get_typlenbyval(inputTypes[numDirectArgs],
4267  &pertrans->inputtypeLen,
4268  &pertrans->inputtypeByVal);
4269  }
4270  else if (numDistinctCols > 0)
4271  {
4272  /* we will need an extra slot to store prior values */
4273  pertrans->uniqslot =
4274  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4276  }
4277 
4278  /* Extract the sort information for use later */
4279  pertrans->sortColIdx =
4280  (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4281  pertrans->sortOperators =
4282  (Oid *) palloc(numSortCols * sizeof(Oid));
4283  pertrans->sortCollations =
4284  (Oid *) palloc(numSortCols * sizeof(Oid));
4285  pertrans->sortNullsFirst =
4286  (bool *) palloc(numSortCols * sizeof(bool));
4287 
4288  i = 0;
4289  foreach(lc, sortlist)
4290  {
4291  SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4292  TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4293 
4294  /* the parser should have made sure of this */
4295  Assert(OidIsValid(sortcl->sortop));
4296 
4297  pertrans->sortColIdx[i] = tle->resno;
4298  pertrans->sortOperators[i] = sortcl->sortop;
4299  pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4300  pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4301  i++;
4302  }
4303  Assert(i == numSortCols);
4304  }
4305 
4306  if (aggref->aggdistinct)
4307  {
4308  Oid *ops;
4309 
4310  Assert(numArguments > 0);
4311  Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4312 
4313  ops = palloc(numDistinctCols * sizeof(Oid));
4314 
4315  i = 0;
4316  foreach(lc, aggref->aggdistinct)
4317  ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4318 
4319  /* lookup / build the necessary comparators */
4320  if (numDistinctCols == 1)
4321  fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4322  else
4323  pertrans->equalfnMulti =
4324  execTuplesMatchPrepare(pertrans->sortdesc,
4325  numDistinctCols,
4326  pertrans->sortColIdx,
4327  ops,
4328  pertrans->sortCollations,
4329  &aggstate->ss.ps);
4330  pfree(ops);
4331  }
4332 
4333  pertrans->sortstates = (Tuplesortstate **)
4334  palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4335 }
4336 
4337 
4338 static Datum
4339 GetAggInitVal(Datum textInitVal, Oid transtype)
4340 {
4341  Oid typinput,
4342  typioparam;
4343  char *strInitVal;
4344  Datum initVal;
4345 
4346  getTypeInputInfo(transtype, &typinput, &typioparam);
4347  strInitVal = TextDatumGetCString(textInitVal);
4348  initVal = OidInputFunctionCall(typinput, strInitVal,
4349  typioparam, -1);
4350  pfree(strInitVal);
4351  return initVal;
4352 }
4353 
4354 void
4356 {
4358  int transno;
4359  int numGroupingSets = Max(node->maxsets, 1);
4360  int setno;
4361 
4362  /*
4363  * When ending a parallel worker, copy the statistics gathered by the
4364  * worker back into shared memory so that it can be picked up by the main
4365  * process to report in EXPLAIN ANALYZE.
4366  */
4367  if (node->shared_info && IsParallelWorker())
4368  {
4370 
4371  Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4374  si->hash_disk_used = node->hash_disk_used;
4375  si->hash_mem_peak = node->hash_mem_peak;
4376  }
4377 
4378  /* Make sure we have closed any open tuplesorts */
4379 
4380  if (node->sort_in)
4381  tuplesort_end(node->sort_in);
4382  if (node->sort_out)
4383  tuplesort_end(node->sort_out);
4384 
4386 
4387  if (node->hash_metacxt != NULL)
4388  {
4390  node->hash_metacxt = NULL;
4391  }
4392 
4393  for (transno = 0; transno < node->numtrans; transno++)
4394  {
4395  AggStatePerTrans pertrans = &node->pertrans[transno];
4396 
4397  for (setno = 0; setno < numGroupingSets; setno++)
4398  {
4399  if (pertrans->sortstates[setno])
4400  tuplesort_end(pertrans->sortstates[setno]);
4401  }
4402  }
4403 
4404  /* And ensure any agg shutdown callbacks have been called */
4405  for (setno = 0; setno < numGroupingSets; setno++)
4406  ReScanExprContext(node->aggcontexts[setno]);
4407  if (node->hashcontext)
4409 
4410  /*
4411  * We don't actually free any ExprContexts here (see comment in
4412  * ExecFreeExprContext), just unlinking the output one from the plan node
4413  * suffices.
4414  */
4415  ExecFreeExprContext(&node->ss.ps);
4416 
4417  /* clean up tuple table */
4419 
4420  outerPlan = outerPlanState(node);
4421  ExecEndNode(outerPlan);
4422 }
4423 
4424 void
4426 {
4427  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4429  Agg *aggnode = (Agg *) node->ss.ps.plan;
4430  int transno;
4431  int numGroupingSets = Max(node->maxsets, 1);
4432  int setno;
4433 
4434  node->agg_done = false;
4435 
4436  if (node->aggstrategy == AGG_HASHED)
4437  {
4438  /*
4439  * In the hashed case, if we haven't yet built the hash table then we
4440  * can just return; nothing done yet, so nothing to undo. If subnode's
4441  * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4442  * else no reason to re-scan it at all.
4443  */
4444  if (!node->table_filled)
4445  return;
4446 
4447  /*
4448  * If we do have the hash table, and it never spilled, and the subplan
4449  * does not have any parameter changes, and none of our own parameter
4450  * changes affect input expressions of the aggregated functions, then
4451  * we can just rescan the existing hash table; no need to build it
4452  * again.
4453  */
4454  if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4455  !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4456  {
4458  &node->perhash[0].hashiter);
4459  select_current_set(node, 0, true);
4460  return;
4461  }
4462  }
4463 
4464  /* Make sure we have closed any open tuplesorts */
4465  for (transno = 0; transno < node->numtrans; transno++)
4466  {
4467  for (setno = 0; setno < numGroupingSets; setno++)
4468  {
4469  AggStatePerTrans pertrans = &node->pertrans[transno];
4470 
4471  if (pertrans->sortstates[setno])
4472  {
4473  tuplesort_end(pertrans->sortstates[setno]);
4474  pertrans->sortstates[setno] = NULL;
4475  }
4476  }
4477  }
4478 
4479  /*
4480  * We don't need to ReScanExprContext the output tuple context here;
4481  * ExecReScan already did it. But we do need to reset our per-grouping-set
4482  * contexts, which may have transvalues stored in them. (We use rescan
4483  * rather than just reset because transfns may have registered callbacks
4484  * that need to be run now.) For the AGG_HASHED case, see below.
4485  */
4486 
4487  for (setno = 0; setno < numGroupingSets; setno++)
4488  {
4489  ReScanExprContext(node->aggcontexts[setno]);
4490  }
4491 
4492  /* Release first tuple of group, if we have made a copy */
4493  if (node->grp_firstTuple != NULL)
4494  {
4496  node->grp_firstTuple = NULL;
4497  }
4499 
4500  /* Forget current agg values */
4501  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4502  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4503 
4504  /*
4505  * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4506  * the hashcontext. This used to be an issue, but now, resetting a context
4507  * automatically deletes sub-contexts too.
4508  */
4509  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4510  {
4512 
4513  node->hash_ever_spilled = false;
4514  node->hash_spill_mode = false;
4515  node->hash_ngroups_current = 0;
4516 
4518  /* Rebuild an empty hash table */
4519  build_hash_tables(node);
4520  node->table_filled = false;
4521  /* iterator will be reset when the table is filled */
4522 
4523  hashagg_recompile_expressions(node, false, false);
4524  }
4525 
4526  if (node->aggstrategy != AGG_HASHED)
4527  {
4528  /*
4529  * Reset the per-group state (in particular, mark transvalues null)
4530  */
4531  for (setno = 0; setno < numGroupingSets; setno++)
4532  {
4533  MemSet(node->pergroups[setno], 0,
4534  sizeof(AggStatePerGroupData) * node->numaggs);
4535  }
4536 
4537  /* reset to phase 1 */
4538  initialize_phase(node, 1);
4539 
4540  node->input_done = false;
4541  node->projected_set = -1;
4542  }
4543 
4544  if (outerPlan->chgParam == NULL)
4545  ExecReScan(outerPlan);
4546 }
4547 
4548 
4549 /***********************************************************************
4550  * API exposed to aggregate functions
4551  ***********************************************************************/
4552 
4553 
4554 /*
4555  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4556  *
4557  * The transition and/or final functions of an aggregate may want to verify
4558  * that they are being called as aggregates, rather than as plain SQL
4559  * functions. They should use this function to do so. The return value
4560  * is nonzero if being called as an aggregate, or zero if not. (Specific
4561  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4562  * values could conceivably appear in future.)
4563  *
4564  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4565  * identity of the memory context that aggregate transition values are being
4566  * stored in. Note that the same aggregate call site (flinfo) may be called
4567  * interleaved on different transition values in different contexts, so it's
4568  * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4569  * cache it in the transvalue itself (for internal-type transvalues).
4570  */
4571 int
4573 {
4574  if (fcinfo->context && IsA(fcinfo->context, AggState))
4575  {
4576  if (aggcontext)
4577  {
4578  AggState *aggstate = ((AggState *) fcinfo->context);
4579  ExprContext *cxt = aggstate->curaggcontext;
4580 
4581  *aggcontext = cxt->ecxt_per_tuple_memory;
4582  }
4583  return AGG_CONTEXT_AGGREGATE;
4584  }
4585  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4586  {
4587  if (aggcontext)
4588  *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4589  return AGG_CONTEXT_WINDOW;
4590  }
4591 
4592  /* this is just to prevent "uninitialized variable" warnings */
4593  if (aggcontext)
4594  *aggcontext = NULL;
4595  return 0;
4596 }
4597 
4598 /*
4599  * AggGetAggref - allow an aggregate support function to get its Aggref
4600  *
4601  * If the function is being called as an aggregate support function,
4602  * return the Aggref node for the aggregate call. Otherwise, return NULL.
4603  *
4604  * Aggregates sharing the same inputs and transition functions can get
4605  * merged into a single transition calculation. If the transition function
4606  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4607  * executing. It must therefore not pay attention to the Aggref fields that
4608  * relate to the final function, as those are indeterminate. But if a final
4609  * function calls AggGetAggref, it will get a precise result.
4610  *
4611  * Note that if an aggregate is being used as a window function, this will
4612  * return NULL. We could provide a similar function to return the relevant
4613  * WindowFunc node in such cases, but it's not needed yet.
4614  */
4615 Aggref *
4617 {
4618  if (fcinfo->context && IsA(fcinfo->context, AggState))
4619  {
4620  AggState *aggstate = (AggState *) fcinfo->context;
4621  AggStatePerAgg curperagg;
4622  AggStatePerTrans curpertrans;
4623 
4624  /* check curperagg (valid when in a final function) */
4625  curperagg = aggstate->curperagg;
4626 
4627  if (curperagg)
4628  return curperagg->aggref;
4629 
4630  /* check curpertrans (valid when in a transition function) */
4631  curpertrans = aggstate->curpertrans;
4632 
4633  if (curpertrans)
4634  return curpertrans->aggref;
4635  }
4636  return NULL;
4637 }
4638 
4639 /*
4640  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4641  *
4642  * This is useful in agg final functions; the context returned is one that
4643  * the final function can safely reset as desired. This isn't useful for
4644  * transition functions, since the context returned MAY (we don't promise)
4645  * be the same as the context those are called in.
4646  *
4647  * As above, this is currently not useful for aggs called as window functions.
4648  */
4651 {
4652  if (fcinfo->context && IsA(fcinfo->context, AggState))
4653  {
4654  AggState *aggstate = (AggState *) fcinfo->context;
4655 
4656  return aggstate->tmpcontext->ecxt_per_tuple_memory;
4657  }
4658  return NULL;
4659 }
4660 
4661 /*
4662  * AggStateIsShared - find out whether transition state is shared
4663  *
4664  * If the function is being called as an aggregate support function,
4665  * return true if the aggregate's transition state is shared across
4666  * multiple aggregates, false if it is not.
4667  *
4668  * Returns true if not called as an aggregate support function.
4669  * This is intended as a conservative answer, ie "no you'd better not
4670  * scribble on your input". In particular, will return true if the
4671  * aggregate is being used as a window function, which is a scenario
4672  * in which changing the transition state is a bad idea. We might
4673  * want to refine the behavior for the window case in future.
4674  */
4675 bool
4677 {
4678  if (fcinfo->context && IsA(fcinfo->context, AggState))
4679  {
4680  AggState *aggstate = (AggState *) fcinfo->context;
4681  AggStatePerAgg curperagg;
4682  AggStatePerTrans curpertrans;
4683 
4684  /* check curperagg (valid when in a final function) */
4685  curperagg = aggstate->curperagg;
4686 
4687  if (curperagg)
4688  return aggstate->pertrans[curperagg->transno].aggshared;
4689 
4690  /* check curpertrans (valid when in a transition function) */
4691  curpertrans = aggstate->curpertrans;
4692 
4693  if (curpertrans)
4694  return curpertrans->aggshared;
4695  }
4696  return true;
4697 }
4698 
4699 /*
4700  * AggRegisterCallback - register a cleanup callback for an aggregate
4701  *
4702  * This is useful for aggs to register shutdown callbacks, which will ensure
4703  * that non-memory resources are freed. The callback will occur just before
4704  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4705  * either between groups or as a result of rescanning the query. The callback
4706  * will NOT be called on error paths. The typical use-case is for freeing of
4707  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4708  * created by the agg functions. (The callback will not be called until after
4709  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4710  * to return data that will be freed by the callback.)
4711  *
4712  * As above, this is currently not useful for aggs called as window functions.
4713  */
4714 void
4717  Datum arg)
4718 {
4719  if (fcinfo->context && IsA(fcinfo->context, AggState))
4720  {
4721  AggState *aggstate = (AggState *) fcinfo->context;
4722  ExprContext *cxt = aggstate->curaggcontext;
4723 
4724  RegisterExprContextCallback(cxt, func, arg);
4725 
4726  return;
4727  }
4728  elog(ERROR, "aggregate function cannot register a callback in this context");
4729 }
4730 
4731 
4732 /* ----------------------------------------------------------------
4733  * Parallel Query Support
4734  * ----------------------------------------------------------------
4735  */
4736 
4737  /* ----------------------------------------------------------------
4738  * ExecAggEstimate
4739  *
4740  * Estimate space required to propagate aggregate statistics.
4741  * ----------------------------------------------------------------
4742  */
4743 void
4745 {
4746  Size size;
4747 
4748  /* don't need this if not instrumenting or no workers */
4749  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4750  return;
4751 
4752  size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4753  size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4754  shm_toc_estimate_chunk(&pcxt->estimator, size);
4755  shm_toc_estimate_keys(&pcxt->estimator, 1);
4756 }
4757 
4758 /* ----------------------------------------------------------------
4759  * ExecAggInitializeDSM
4760  *
4761  * Initialize DSM space for aggregate statistics.
4762  * ----------------------------------------------------------------
4763  */
4764 void
4766 {
4767  Size size;
4768 
4769  /* don't need this if not instrumenting or no workers */
4770  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4771  return;
4772 
4773  size = offsetof(SharedAggInfo, sinstrument)
4774  + pcxt->nworkers * sizeof(AggregateInstrumentation);
4775  node->shared_info = shm_toc_allocate(pcxt->toc, size);
4776  /* ensure any unfilled slots will contain zeroes */
4777  memset(node->shared_info, 0, size);
4778  node->shared_info->num_workers = pcxt->nworkers;
4779  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4780  node->shared_info);
4781 }
4782 
4783 /* ----------------------------------------------------------------
4784  * ExecAggInitializeWorker
4785  *
4786  * Attach worker to DSM space for aggregate statistics.
4787  * ----------------------------------------------------------------
4788  */
4789 void
4791 {
4792  node->shared_info =
4793  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
4794 }
4795 
4796 /* ----------------------------------------------------------------
4797  * ExecAggRetrieveInstrumentation
4798  *
4799  * Transfer aggregate statistics from DSM to private memory.
4800  * ----------------------------------------------------------------
4801  */
4802 void
4804 {
4805  Size size;
4806  SharedAggInfo *si;
4807 
4808  if (node->shared_info == NULL)
4809  return;
4810 
4811  size = offsetof(SharedAggInfo, sinstrument)
4813  si = palloc(size);
4814  memcpy(si, node->shared_info, size);
4815  node->shared_info = si;
4816 }
static void hashagg_reset_spill_state(AggState *aggstate)
Definition: nodeAgg.c:3178
List * aggdistinct
Definition: primnodes.h:327
struct AggStatePerTransData * AggStatePerTrans
Definition: execnodes.h:2125
ExprState ** eqfunctions
Definition: nodeAgg.h:278
struct HashAggSpill * hash_spills
Definition: execnodes.h:2176
AggStatePerGroup * hash_pergroup
Definition: execnodes.h:2196
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:685
#define NIL
Definition: pg_list.h:65
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:977
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition: nodeAgg.c:567
struct AggStatePerGroupData * AggStatePerGroup
Definition: execnodes.h:2126
#define ScanTupleHashTable(htable, iter)
Definition: execnodes.h:733
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition: nodeAgg.c:475
int numCols
Definition: plannodes.h:821
static int partitions
Definition: pgbench.c:196
static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartittions)
Definition: nodeAgg.c:1978
List * qual
Definition: plannodes.h:137
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev)
Definition: tuplesort.c:2475
TupleHashTable BuildTupleHashTableExt(PlanState *parent, TupleDesc inputDesc, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, long nbuckets, Size additionalsize, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, bool use_variable_hash_iv)
Definition: execGrouping.c:154
bool aggvariadic
Definition: primnodes.h:330
int bms_first_member(Bitmapset *a)
Definition: bitmapset.c:996
AggStatePerPhase phases
Definition: execnodes.h:2163
double hashentrysize
Definition: execnodes.h:2188
#define IsA(nodeptr, _type_)
Definition: nodes.h:578
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2021
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
Definition: nodeAgg.c:2914
#define AllocSetContextCreate
Definition: memutils.h:170
AttrNumber * hashGrpColIdxInput
Definition: nodeAgg.h:311
Datum * ecxt_aggvalues
Definition: execnodes.h:244
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition: nodeAgg.c:1904
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
Definition: execGrouping.c:359
uint64 hash_ngroups_limit
Definition: execnodes.h:2185
#define HASHAGG_MAX_PARTITIONS
Definition: nodeAgg.c:289
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:307
Index varlevelsup
Definition: primnodes.h:191
TargetEntry * get_sortgroupclause_tle(SortGroupClause *sgClause, List *targetList)
Definition: tlist.c:389
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:1228
static void hash_agg_check_limits(AggState *aggstate)
Definition: nodeAgg.c:1843
static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory)
Definition: nodeAgg.c:1953
AttrNumber * grpColIdx
Definition: plannodes.h:822
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:968
uint64 transitionSpace
Definition: plannodes.h:826
Instrumentation * instrument
Definition: execnodes.h:938
static void agg_fill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2524
int aggtransno
Definition: primnodes.h:336
Bitmapset * colnos_needed
Definition: execnodes.h:2158
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:498
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
List * lcons_int(int datum, List *list)
Definition: list.c:471
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:1546
int numaggs
Definition: execnodes.h:2134
int nfreetapes
Definition: nodeAgg.c:331
Oid GetUserId(void)
Definition: miscinit.c:476
bool agg_done
Definition: execnodes.h:2152
#define castNode(_type_, nodeptr)
Definition: nodes.h:596
Oid * grpCollations
Definition: plannodes.h:824
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
#define TTS_EMPTY(slot)
Definition: tuptable.h:97
TupleTableSlot * sort_slot
Definition: execnodes.h:2166
List * all_grouped_cols
Definition: execnodes.h:2157
Tuplesortstate * sort_out
Definition: execnodes.h:2165
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1159
ScanState ss
Definition: execnodes.h:2132
FmgrInfo equalfnOne
Definition: nodeAgg.h:110
ExprContext * ps_ExprContext
Definition: execnodes.h:967
MinimalTuple firstTuple
Definition: execnodes.h:686
shm_toc_estimator estimator
Definition: parallel.h:42
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:233
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition: nodeAgg.c:3136
ExprState * evaltrans
Definition: nodeAgg.h:283
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
int64 input_tuples
Definition: nodeAgg.c:370
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
int plan_node_id
Definition: plannodes.h:135
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct HashTapeInfo HashTapeInfo
Oid inputcollid
Definition: primnodes.h:321
int current_phase
Definition: execnodes.h:2140
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition: nodeAgg.c:3102
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition: tuptable.h:341
Definition: nodes.h:527
AggSplit aggsplit
Definition: execnodes.h:2137
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition: nodeAgg.c:2142
bool * nullsFirst
Definition: plannodes.h:774
int errcode(int sqlerrcode)
Definition: elog.c:691
List * args
Definition: primnodes.h:325
#define MemSet(start, val, len)
Definition: c.h:1004
AttrNumber varattno
Definition: primnodes.h:186
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
fmNodePtr context
Definition: fmgr.h:88
Datum * tts_values
Definition: tuptable.h:126
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1320
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition: parse_agg.c:2013
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1056
bool all_cols_needed
Definition: execnodes.h:2160
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2037
AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2108
TupleTableSlot * hash_spill_rslot
Definition: execnodes.h:2178
AggStatePerTrans pertrans
Definition: execnodes.h:2142
EState * state
Definition: execnodes.h:930
int projected_set
Definition: execnodes.h:2153
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1152
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
uint32 hash_bytes_uint32(uint32 k)
Definition: hashfn.c:610
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:372
HeapTuple grp_firstTuple
Definition: execnodes.h:2170
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
Definition: primnodes.h:181
Aggref * aggref
Definition: nodeAgg.h:187
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition: nodeAgg.c:1372
static void advance_aggregates(AggState *aggstate)
Definition: nodeAgg.c:838
int current_set
Definition: execnodes.h:2155
uint32 mask
Definition: nodeAgg.c:350
#define OidIsValid(objectId)
Definition: c.h:706
#define DO_AGGSPLIT_COMBINE(as)
Definition: nodes.h:789
FunctionCallInfo transfn_fcinfo
Definition: nodeAgg.h:162
TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 *hash)
Definition: execGrouping.c:304
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4765
struct HashAggBatch HashAggBatch
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:649
Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
int numtrans
Definition: execnodes.h:2135
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1439
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition: nodeAgg.c:1869
TupleDesc sortdesc
Definition: nodeAgg.h:138
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, double input_groups, double hashentrysize)
Definition: nodeAgg.c:2934
Oid * sortOperators
Definition: plannodes.h:772
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:951
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
Definition: execGrouping.c:96
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:208
void ResetTupleHashTable(TupleHashTable hashtable)
Definition: execGrouping.c:283
ExprContext * tmpcontext
Definition: execnodes.h:2145
FmgrInfo transfn
Definition: nodeAgg.h:81
#define HASHAGG_PARTITION_FACTOR
Definition: nodeAgg.c:287
static void build_hash_table(AggState *aggstate, int setno, long nbuckets)
Definition: nodeAgg.c:1498
int max_colno_needed
Definition: execnodes.h:2159
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition: nodeAgg.c:1264
bool hash_spill_mode
Definition: execnodes.h:2182
#define FUNC_MAX_ARGS
static void hashagg_tapeinfo_init(AggState *aggstate)
Definition: nodeAgg.c:2870
List * hash_batches
Definition: execnodes.h:2180
Aggref * aggref
Definition: nodeAgg.h:44
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:878
#define linitial_int(l)
Definition: pg_list.h:175
Bitmapset ** grouped_cols
Definition: nodeAgg.h:277
PlanState ps
Definition: execnodes.h:1317
LogicalTapeSet * tapeset
Definition: nodeAgg.c:368
int maxsets
Definition: execnodes.h:2162
#define true
Definition: c.h:383
static bool agg_refill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2578
static bool find_cols_walker(Node *node, FindColsContext *context)
Definition: nodeAgg.c:1415
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
Definition: nodeAgg.c:1687
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
void initHyperLogLog(hyperLogLogState *cState, uint8 bwidth)
Definition: hyperloglog.c:66
#define DO_AGGSPLIT_SERIALIZE(as)
Definition: nodes.h:791
#define HASHAGG_MIN_PARTITIONS
Definition: nodeAgg.c:288
void pfree(void *pointer)
Definition: mcxt.c:1057
MemoryContext es_query_cxt
Definition: execnodes.h:559
AggStrategy aggstrategy
Definition: plannodes.h:819
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition: nodeAgg.c:3230
#define linitial(l)
Definition: pg_list.h:174
bool table_filled
Definition: execnodes.h:2172
AggStrategy aggstrategy
Definition: execnodes.h:2136
#define HASHAGG_HLL_BIT_WIDTH
Definition: nodeAgg.c:306
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:775
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2725
static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
Definition: nodeAgg.c:1398
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:61
#define lfirst_int(lc)
Definition: pg_list.h:170
static void * list_nth(const List *list, int n)
Definition: pg_list.h:266
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1525
MemoryContext hash_metacxt
Definition: execnodes.h:2174
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition: nodeAgg.c:2178
#define AGG_CONTEXT_AGGREGATE
Definition: fmgr.h:738
struct TupleHashEntryData TupleHashEntryData
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static void find_hash_columns(AggState *aggstate)
Definition: nodeAgg.c:1558
ExprState * equalfnMulti
Definition: nodeAgg.h:111
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
Tuplesortstate * sort_in
Definition: execnodes.h:2164
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1022
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2389
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup)
Definition: nodeAgg.c:1309
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4676
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, int ndest)
Definition: nodeAgg.c:2891
#define list_nth_node(type, list, n)
Definition: pg_list.h:294
Tuplesortstate ** sortstates
Definition: nodeAgg.h:154
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
Bitmapset * aggParams
Definition: plannodes.h:827
static int initValue(long lng_val)
Definition: informix.c:677
MemoryContext tablecxt
Definition: execnodes.h:708
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:534
bool * tts_isnull
Definition: tuptable.h:128
int npartitions
Definition: nodeAgg.c:347
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:292
MinimalTupleData * MinimalTuple
Definition: htup.h:27
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:962
List * aggorder
Definition: primnodes.h:326
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4744
int errcode_for_file_access(void)
Definition: elog.c:714
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:135
AttrNumber resno
Definition: primnodes.h:1423
#define DatumGetBool(X)
Definition: postgres.h:393
int ParallelWorkerNumber
Definition: parallel.c:112
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
Definition: nodeAgg.c:2965
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:227
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:317
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
Index agglevelsup
Definition: primnodes.h:333
int used_bits
Definition: nodeAgg.c:367
struct AggregateInstrumentation AggregateInstrumentation
Bitmapset * unaggregated
Definition: nodeAgg.c:379
#define TupIsNull(slot)
Definition: tuptable.h:292
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:429
List * aggdirectargs
Definition: primnodes.h:324
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition: nodeAgg.c:4339
AggStatePerAgg curperagg
Definition: execnodes.h:2148
AttrNumber * sortColIdx
Definition: nodeAgg.h:100
struct AggStatePerGroupData AggStatePerGroupData
struct HashTapeInfo * hash_tapeinfo
Definition: execnodes.h:2175
AggStatePerHash perhash
Definition: execnodes.h:2195
bool outeropsset
Definition: execnodes.h:1009
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition: nodeAgg.c:685
AggStrategy aggstrategy
Definition: nodeAgg.h:274
ExprState * evaltrans_cache[2][2]
Definition: nodeAgg.h:291
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1030
#define EXEC_FLAG_REWIND
Definition: executor.h:57
hyperLogLogState * hll_card
Definition: nodeAgg.c:352
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2756
void build_aggregate_combinefn_expr(Oid agg_state_type, Oid agg_input_collation, Oid combinefn_oid, Expr **combinefnexpr)
Definition: parse_agg.c:1961
Datum value
Definition: postgres.h:378
Bitmapset * grouped_cols
Definition: execnodes.h:2156
#define IsParallelWorker()
Definition: parallel.h:61
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:131
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1141
void ExecAggRetrieveInstrumentation(AggState *node)
Definition: nodeAgg.c:4803
int hash_batches_used
Definition: execnodes.h:2193
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4650
Bitmapset * chgParam
Definition: execnodes.h:960
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:191
int ntapes
Definition: nodeAgg.c:329
bool IsBinaryCoercible(Oid srctype, Oid targettype)
int my_log2(long num)
Definition: dynahash.c:1730
double input_card
Definition: nodeAgg.c:371
#define outerPlan(node)
Definition: plannodes.h:166
List * lappend(List *list, void *datum)
Definition: list.c:321
Bitmapset * aggregated
Definition: nodeAgg.c:378
TupleHashIterator hashiter
Definition: nodeAgg.h:304
int numCols
Definition: plannodes.h:770
Index varno
Definition: primnodes.h:184
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:598
int num_hashes
Definition: execnodes.h:2173
Plan plan
Definition: plannodes.h:818
AttrNumber * hashGrpColIdxHash
Definition: nodeAgg.h:312
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1115
bool input_done
Definition: execnodes.h:2151
#define SizeofMinimalTupleHeader
Definition: htup_details.h:649
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
ExprContext * curaggcontext
Definition: execnodes.h:2147
ExprContext * hashcontext
Definition: execnodes.h:2143
bool * ecxt_aggnulls
Definition: execnodes.h:246
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:399
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define TextDatumGetCString(d)
Definition: builtins.h:87
List * es_tupleTable
Definition: execnodes.h:561
#define HASHAGG_READ_BUFFER_SIZE
Definition: nodeAgg.c:297
AggStatePerPhase phase
Definition: execnodes.h:2138
void * palloc0(Size size)
Definition: mcxt.c:981
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:934
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1163
struct FunctionCallInfoBaseData * FunctionCallInfo
Definition: fmgr.h:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:242
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1376
FmgrInfo deserialfn
Definition: nodeAgg.h:87
int work_mem
Definition: globals.c:121
List * groupingSets
Definition: plannodes.h:829
int16 resulttypeLen
Definition: nodeAgg.h:216
static void initialize_phase(AggState *aggstate, int newphase)
Definition: nodeAgg.c:497
double estimateHyperLogLog(hyperLogLogState *cState)
Definition: hyperloglog.c:186
LogicalTapeSet * tapeset
Definition: nodeAgg.c:328
struct FindColsContext FindColsContext
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:133
Plan * plan
Definition: execnodes.h:928
#define InvalidOid
Definition: postgres_ext.h:36
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1202
Oid aggfnoid
Definition: primnodes.h:318
int16 attnum
Definition: pg_attribute.h:79
#define ResetTupleHashIterator(htable, iter)
Definition: execnodes.h:731
#define ereport(elevel,...)
Definition: elog.h:155
static HeapTuple ExecCopySlotHeapTuple(TupleTableSlot *slot)
Definition: tuptable.h:452
static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:726
#define LOCAL_FCINFO(name, nargs)
Definition: fmgr.h:110
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
Definition: nodeAgg.c:1734
List * lcons(void *datum, List *list)
Definition: list.c:453
static void prepare_hash_slot(AggStatePerHash perhash, TupleTableSlot *inputslot, TupleTableSlot *hashslot)
Definition: nodeAgg.c:1219
int aggno
Definition: primnodes.h:335
uint64 hash_disk_used
Definition: execnodes.h:2192
Size MemoryContextMemAllocated(MemoryContext context, bool recurse)
Definition: mcxt.c:471
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define Max(x, y)
Definition: c.h:976
ExprContext ** aggcontexts
Definition: execnodes.h:2144
#define makeNode(_type_)
Definition: nodes.h:575
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:229
int plan_width
Definition: plannodes.h:124
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FmgrInfo * hashfunctions
Definition: nodeAgg.h:306
#define Assert(condition)
Definition: c.h:800
#define lfirst(lc)
Definition: pg_list.h:169
void RegisterExprContextCallback(ExprContext *econtext, ExprContextCallbackFunction function, Datum arg)
Definition: execUtils.c:924
FmgrInfo serialfn
Definition: nodeAgg.h:84
ExprState * execTuplesMatchPrepare(TupleDesc desc, int numCols, const AttrNumber *keyColIdx, const Oid *eqOperators, const Oid *collations, PlanState *parent)
Definition: execGrouping.c:59
int input_tapenum
Definition: nodeAgg.c:369
FunctionCallInfo deserialfn_fcinfo
Definition: nodeAgg.h:167
#define EXEC_FLAG_MARK
Definition: executor.h:59
AggSplit aggsplit
Definition: plannodes.h:820
struct AggStatePerAggData * AggStatePerAgg
Definition: execnodes.h:2124
void ExecReScanAgg(AggState *node)
Definition: nodeAgg.c:4425
void build_aggregate_serialfn_expr(Oid serialfn_oid, Expr **serialfnexpr)
Definition: parse_agg.c:1990
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:109
Expr * expr
Definition: primnodes.h:1422
AggSplit aggsplit
Definition: primnodes.h:334
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:692
void(* ExprContextCallbackFunction)(Datum arg)
Definition: execnodes.h:187
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1900
bool hash_ever_spilled
Definition: execnodes.h:2181
AggStatePerGroup * pergroups
Definition: execnodes.h:2168
void freeHyperLogLog(hyperLogLogState *cState)
Definition: hyperloglog.c:151
size_t Size
Definition: c.h:528
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:225
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:479
#define AGG_CONTEXT_WINDOW
Definition: fmgr.h:739
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
FunctionCallInfo serialfn_fcinfo
Definition: nodeAgg.h:165
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
bool expression_tree_walker(Node *node, bool(*walker)(), void *context)
Definition: nodeFuncs.c:1888
static int list_length(const List *l)
Definition: pg_list.h:149
long numGroups
Definition: plannodes.h:825
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:768
#define DO_AGGSPLIT_SKIPFINAL(as)
Definition: nodes.h:790
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2144
void addHyperLogLog(hyperLogLogState *cState, uint32 hash)
Definition: hyperloglog.c:167
Expr * aggfilter
Definition: primnodes.h:328
int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
Definition: nodeAgg.c:4572
TupleDesc ExecTypeFromTL(List *targetList)
Definition: execTuples.c:1908
#define MAXALIGN(LEN)
Definition: c.h:753
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1769
void ReScanExprContext(ExprContext *econtext)
Definition: execUtils.c:437
static TupleTableSlot * agg_retrieve_hash_table_in_memory(AggState *aggstate)
Definition: nodeAgg.c:2750
LogicalTapeSet * tapeset
Definition: nodeAgg.c:346
bool outeropsfixed
Definition: execnodes.h:1005
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define DO_AGGSPLIT_DESERIALIZE(as)
Definition: nodes.h:792
Size hash_mem_limit
Definition: execnodes.h:2184
struct Plan * lefttree
Definition: plannodes.h:138
TupleTableSlot * uniqslot
Definition: nodeAgg.h:137
int numphases
Definition: execnodes.h:2139
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:489
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:863
List * targetlist
Definition: plannodes.h:136
ExprState * qual
Definition: execnodes.h:949
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAgg.c:4790
#define DatumGetPointer(X)
Definition: postgres.h:549
AttrNumber * sortColIdx
Definition: plannodes.h:771
#define CHUNKHDRSZ
Definition: nodeAgg.c:312
#define HASHAGG_WRITE_BUFFER_SIZE
Definition: nodeAgg.c:298
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:494
void AggRegisterCallback(FunctionCallInfo fcinfo, ExprContextCallbackFunction func, Datum arg)
Definition: nodeAgg.c:4715
void hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions)
Definition: nodeAgg.c:1785
Size hash_mem_peak
Definition: execnodes.h:2189
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171