PostgreSQL Source Code  git master
nodeAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  * Routines to handle aggregate nodes.
5  *
6  * ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  * transvalue = initcond
9  * foreach input_tuple do
10  * transvalue = transfunc(transvalue, input_value(s))
11  * result = finalfunc(transvalue, direct_argument(s))
12  *
13  * If a finalfunc is not supplied then the result is just the ending
14  * value of transvalue.
15  *
16  * Other behaviors can be selected by the "aggsplit" mode, which exists
17  * to support partial aggregation. It is possible to:
18  * * Skip running the finalfunc, so that the output is always the
19  * final transvalue state.
20  * * Substitute the combinefunc for the transfunc, so that transvalue
21  * states (propagated up from a child partial-aggregation step) are merged
22  * rather than processing raw input rows. (The statements below about
23  * the transfunc apply equally to the combinefunc, when it's selected.)
24  * * Apply the serializefunc to the output values (this only makes sense
25  * when skipping the finalfunc, since the serializefunc works on the
26  * transvalue data type).
27  * * Apply the deserializefunc to the input values (this only makes sense
28  * when using the combinefunc, for similar reasons).
29  * It is the planner's responsibility to connect up Agg nodes using these
30  * alternate behaviors in a way that makes sense, with partial aggregation
31  * results being fed to nodes that expect them.
32  *
33  * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  * input tuples and eliminate duplicates (if required) before performing
35  * the above-depicted process. (However, we don't do that for ordered-set
36  * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  * so far as this module is concerned.) Note that partial aggregation
38  * is not supported in these cases, since we couldn't ensure global
39  * ordering or distinctness of the inputs.
40  *
41  * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  * then the first non-NULL input_value is assigned directly to transvalue,
43  * and transfunc isn't applied until the second non-NULL input_value.
44  * The agg's first input type and transtype must be the same in this case!
45  *
46  * If transfunc is marked "strict" then NULL input_values are skipped,
47  * keeping the previous transvalue. If transfunc is not strict then it
48  * is called for every input tuple and must deal with NULL initcond
49  * or NULL input_values for itself.
50  *
51  * If finalfunc is marked "strict" then it is not called when the
52  * ending transvalue is NULL, instead a NULL result is created
53  * automatically (this is just the usual handling of strict functions,
54  * of course). A non-strict finalfunc can make its own choice of
55  * what to return for a NULL ending transvalue.
56  *
57  * Ordered-set aggregates are treated specially in one other way: we
58  * evaluate any "direct" arguments and pass them to the finalfunc along
59  * with the transition value.
60  *
61  * A finalfunc can have additional arguments beyond the transvalue and
62  * any "direct" arguments, corresponding to the input arguments of the
63  * aggregate. These are always just passed as NULL. Such arguments may be
64  * needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  * We compute aggregate input expressions and run the transition functions
67  * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68  * once per input tuple, so when the transvalue datatype is
69  * pass-by-reference, we have to be careful to copy it into a longer-lived
70  * memory context, and free the prior value to avoid memory leakage. We
71  * store transvalues in another set of econtexts, aggstate->aggcontexts
72  * (one per grouping set, see below), which are also used for the hashtable
73  * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74  * reset, at group boundaries so that aggregate transition functions can
75  * register shutdown callbacks via AggRegisterCallback.
76  *
77  * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  * run finalize functions and compute the output tuple; this context can be
79  * reset once per output tuple.
80  *
81  * The executor's AggState node is passed as the fmgr "context" value in
82  * all transfunc and finalfunc calls. It is not recommended that the
83  * transition functions look at the AggState node directly, but they can
84  * use AggCheckCallContext() to verify that they are being called by
85  * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86  * transition function might want to know this is so that it can avoid
87  * palloc'ing a fixed-size pass-by-ref transition value on every call:
88  * it can instead just scribble on and return its left input. Ordinarily
89  * it is completely forbidden for functions to modify pass-by-ref inputs,
90  * but in the aggregate case we know the left input is either the initial
91  * transition value or a previous function result, and in either case its
92  * value need not be preserved. See int8inc() for an example. Notice that
93  * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  * the previous transition value pointer is returned. It is also possible
95  * to avoid repeated data copying when the transition value is an expanded
96  * object: to do that, the transition function must take care to return
97  * an expanded object that is in a child context of the memory context
98  * returned by AggCheckCallContext(). Also, some transition functions want
99  * to store working state in addition to the nominal transition value; they
100  * can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103  * AggState is available as context in earlier releases (back to 8.1),
104  * but direct examination of the node is needed to use it before 9.0.
105  *
106  * As of 9.4, aggregate transition functions can also use AggGetAggref()
107  * to get hold of the Aggref expression node for their aggregate call.
108  * This is mainly intended for ordered-set aggregates, which are not
109  * supported as window functions. (A regular aggregate function would
110  * need some fallback logic to use this, since there's no Aggref node
111  * for a window function.)
112  *
113  * Grouping sets:
114  *
115  * A list of grouping sets which is structurally equivalent to a ROLLUP
116  * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  * ordered data. We do this by keeping a separate set of transition values
118  * for each grouping set being concurrently processed; for each input tuple
119  * we update them all, and on group boundaries we reset those states
120  * (starting at the front of the list) whose grouping values have changed
121  * (the list of grouping sets is ordered from most specific to least
122  * specific).
123  *
124  * Where more complex grouping sets are used, we break them down into
125  * "phases", where each phase has a different sort order (except phase 0
126  * which is reserved for hashing). During each phase but the last, the
127  * input tuples are additionally stored in a tuplesort which is keyed to the
128  * next phase's sort order; during each phase but the first, the input
129  * tuples are drawn from the previously sorted data. (The sorting of the
130  * data for the first phase is handled by the planner, as it might be
131  * satisfied by underlying nodes.)
132  *
133  * Hashing can be mixed with sorted grouping. To do this, we have an
134  * AGG_MIXED strategy that populates the hashtables during the first sorted
135  * phase, and switches to reading them out after completing all sort phases.
136  * We can also support AGG_HASHED with multiple hash tables and no sorting
137  * at all.
138  *
139  * From the perspective of aggregate transition and final functions, the
140  * only issue regarding grouping sets is this: a single call site (flinfo)
141  * of an aggregate function may be used for updating several different
142  * transition values in turn. So the function must not cache in the flinfo
143  * anything which logically belongs as part of the transition value (most
144  * importantly, the memory context in which the transition value exists).
145  * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  * sensitive to the grouping set for which the aggregate function is
147  * currently being called.
148  *
149  * Plan structure:
150  *
151  * What we get from the planner is actually one "real" Agg node which is
152  * part of the plan tree proper, but which optionally has an additional list
153  * of Agg nodes hung off the side via the "chain" field. This is because an
154  * Agg node happens to be a convenient representation of all the data we
155  * need for grouping sets.
156  *
157  * For many purposes, we treat the "real" node as if it were just the first
158  * node in the chain. The chain must be ordered such that hashed entries
159  * come before sorted/plain entries; the real node is marked AGG_MIXED if
160  * there are both types present (in which case the real node describes one
161  * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163  * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  * chained nodes.
166  *
167  * We collect all hashed nodes into a single "phase", numbered 0, and create
168  * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  * Phase 0 is allocated even if there are no hashes, but remains unused in
170  * that case.
171  *
172  * AGG_HASHED nodes actually refer to only a single grouping set each,
173  * because for each hashed grouping we need a separate grpColIdx and
174  * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175  * grouping sets that share a sort order. Each AGG_SORTED node other than
176  * the first one has an associated Sort node which describes the sort order
177  * to be used; the first sorted node takes its input from the outer subtree,
178  * which the planner has already arranged to provide ordered data.
179  *
180  * Memory and ExprContext usage:
181  *
182  * Because we're accumulating aggregate values across input rows, we need to
183  * use more memory contexts than just simple input/output tuple contexts.
184  * In fact, for a rollup, we need a separate context for each grouping set
185  * so that we can reset the inner (finer-grained) aggregates on their group
186  * boundaries while continuing to accumulate values for outer
187  * (coarser-grained) groupings. On top of this, we might be simultaneously
188  * populating hashtables; however, we only need one context for all the
189  * hashtables.
190  *
191  * So we create an array, aggcontexts, with an ExprContext for each grouping
192  * set in the largest rollup that we're going to process, and use the
193  * per-tuple memory context of those ExprContexts to store the aggregate
194  * transition values. hashcontext is the single context created to support
195  * all hash tables.
196  *
197  * Spilling To Disk
198  *
199  * When performing hash aggregation, if the hash table memory exceeds the
200  * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  * mode, we advance the transition states only for groups already in the
202  * hash table. For tuples that would need to create a new hash table
203  * entries (and initialize new transition states), we instead spill them to
204  * disk to be processed later. The tuples are spilled in a partitioned
205  * manner, so that subsequent batches are smaller and less likely to exceed
206  * hash_mem (if a batch does exceed hash_mem, it must be spilled
207  * recursively).
208  *
209  * Spilled data is written to logical tapes. These provide better control
210  * over memory usage, disk space, and the number of files than if we were
211  * to use a BufFile for each spill.
212  *
213  * Note that it's possible for transition states to start small but then
214  * grow very large; for instance in the case of ARRAY_AGG. In such cases,
215  * it's still possible to significantly exceed hash_mem. We try to avoid
216  * this situation by estimating what will fit in the available memory, and
217  * imposing a limit on the number of groups separately from the amount of
218  * memory consumed.
219  *
220  * Transition / Combine function invocation:
221  *
222  * For performance reasons transition functions, including combine
223  * functions, aren't invoked one-by-one from nodeAgg.c after computing
224  * arguments using the expression evaluation engine. Instead
225  * ExecBuildAggTrans() builds one large expression that does both argument
226  * evaluation and transition function invocation. That avoids performance
227  * issues due to repeated uses of expression evaluation, complications due
228  * to filter expressions having to be evaluated early, and allows to JIT
229  * the entire expression into one native function.
230  *
231  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
232  * Portions Copyright (c) 1994, Regents of the University of California
233  *
234  * IDENTIFICATION
235  * src/backend/executor/nodeAgg.c
236  *
237  *-------------------------------------------------------------------------
238  */
239 
240 #include "postgres.h"
241 
242 #include "access/htup_details.h"
243 #include "access/parallel.h"
244 #include "catalog/objectaccess.h"
245 #include "catalog/pg_aggregate.h"
246 #include "catalog/pg_proc.h"
247 #include "catalog/pg_type.h"
248 #include "common/hashfn.h"
249 #include "executor/execExpr.h"
250 #include "executor/executor.h"
251 #include "executor/nodeAgg.h"
252 #include "lib/hyperloglog.h"
253 #include "miscadmin.h"
254 #include "nodes/makefuncs.h"
255 #include "nodes/nodeFuncs.h"
256 #include "optimizer/optimizer.h"
257 #include "parser/parse_agg.h"
258 #include "parser/parse_coerce.h"
259 #include "utils/acl.h"
260 #include "utils/builtins.h"
261 #include "utils/datum.h"
262 #include "utils/dynahash.h"
263 #include "utils/expandeddatum.h"
264 #include "utils/logtape.h"
265 #include "utils/lsyscache.h"
266 #include "utils/memutils.h"
267 #include "utils/syscache.h"
268 #include "utils/tuplesort.h"
269 
270 /*
271  * Control how many partitions are created when spilling HashAgg to
272  * disk.
273  *
274  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
275  * partitions needed such that each partition will fit in memory. The factor
276  * is set higher than one because there's not a high cost to having a few too
277  * many partitions, and it makes it less likely that a partition will need to
278  * be spilled recursively. Another benefit of having more, smaller partitions
279  * is that small hash tables may perform better than large ones due to memory
280  * caching effects.
281  *
282  * We also specify a min and max number of partitions per spill. Too few might
283  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
284  * many will result in lots of memory wasted buffering the spill files (which
285  * could instead be spent on a larger hash table).
286  */
287 #define HASHAGG_PARTITION_FACTOR 1.50
288 #define HASHAGG_MIN_PARTITIONS 4
289 #define HASHAGG_MAX_PARTITIONS 1024
290 
291 /*
292  * For reading from tapes, the buffer size must be a multiple of
293  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
294  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
295  * tape always uses a buffer of size BLCKSZ.
296  */
297 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
298 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
299 
300 /*
301  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303  * worst-case error of around 18%. That's effective enough to choose a
304  * reasonable number of partitions when recursing.
305  */
306 #define HASHAGG_HLL_BIT_WIDTH 5
307 
308 /*
309  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
310  * improved?
311  */
312 #define CHUNKHDRSZ 16
313 
314 /*
315  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
316  * number of tapes needed at the start of the algorithm (because it can
317  * recurse), so one tape set is allocated and extended as needed for new
318  * tapes. When a particular tape is already read, rewind it for write mode and
319  * put it in the free list.
320  *
321  * Tapes' buffers can take up substantial memory when many tapes are open at
322  * once. We only need one tape open at a time in read mode (using a buffer
323  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
324  * requiring a buffer of size BLCKSZ) for each partition.
325  */
326 typedef struct HashTapeInfo
327 {
329  int ntapes;
330  int *freetapes;
333 } HashTapeInfo;
334 
335 /*
336  * Represents partitioned spill data for a single hashtable. Contains the
337  * necessary information to route tuples to the correct partition, and to
338  * transform the spilled data into new batches.
339  *
340  * The high bits are used for partition selection (when recursing, we ignore
341  * the bits that have already been used for partition selection at an earlier
342  * level).
343  */
344 typedef struct HashAggSpill
345 {
346  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
347  int npartitions; /* number of partitions */
348  int *partitions; /* spill partition tape numbers */
349  int64 *ntuples; /* number of tuples in each partition */
350  uint32 mask; /* mask to find partition from hash value */
351  int shift; /* after masking, shift by this amount */
352  hyperLogLogState *hll_card; /* cardinality estimate for contents */
353 } HashAggSpill;
354 
355 /*
356  * Represents work to be done for one pass of hash aggregation (with only one
357  * grouping set).
358  *
359  * Also tracks the bits of the hash already used for partition selection by
360  * earlier iterations, so that this batch can use new bits. If all bits have
361  * already been used, no partitioning will be done (any spilled data will go
362  * to a single output tape).
363  */
364 typedef struct HashAggBatch
365 {
366  int setno; /* grouping set */
367  int used_bits; /* number of bits of hash already used */
368  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
369  int input_tapenum; /* input partition tape */
370  int64 input_tuples; /* number of tuples in this batch */
371  double input_card; /* estimated group cardinality */
372 } HashAggBatch;
373 
374 /* used to find referenced colnos */
375 typedef struct FindColsContext
376 {
377  bool is_aggref; /* is under an aggref */
378  Bitmapset *aggregated; /* column references under an aggref */
379  Bitmapset *unaggregated; /* other column references */
381 
382 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
383 static void initialize_phase(AggState *aggstate, int newphase);
384 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
385 static void initialize_aggregates(AggState *aggstate,
386  AggStatePerGroup *pergroups,
387  int numReset);
388 static void advance_transition_function(AggState *aggstate,
389  AggStatePerTrans pertrans,
390  AggStatePerGroup pergroupstate);
391 static void advance_aggregates(AggState *aggstate);
392 static void process_ordered_aggregate_single(AggState *aggstate,
393  AggStatePerTrans pertrans,
394  AggStatePerGroup pergroupstate);
395 static void process_ordered_aggregate_multi(AggState *aggstate,
396  AggStatePerTrans pertrans,
397  AggStatePerGroup pergroupstate);
398 static void finalize_aggregate(AggState *aggstate,
399  AggStatePerAgg peragg,
400  AggStatePerGroup pergroupstate,
401  Datum *resultVal, bool *resultIsNull);
402 static void finalize_partialaggregate(AggState *aggstate,
403  AggStatePerAgg peragg,
404  AggStatePerGroup pergroupstate,
405  Datum *resultVal, bool *resultIsNull);
406 static inline void prepare_hash_slot(AggStatePerHash perhash,
407  TupleTableSlot *inputslot,
408  TupleTableSlot *hashslot);
409 static void prepare_projection_slot(AggState *aggstate,
410  TupleTableSlot *slot,
411  int currentSet);
412 static void finalize_aggregates(AggState *aggstate,
413  AggStatePerAgg peragg,
414  AggStatePerGroup pergroup);
415 static TupleTableSlot *project_aggregates(AggState *aggstate);
416 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
417  Bitmapset **unaggregated);
418 static bool find_cols_walker(Node *node, FindColsContext *context);
419 static void build_hash_tables(AggState *aggstate);
420 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
421 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
422  bool nullcheck);
423 static long hash_choose_num_buckets(double hashentrysize,
424  long estimated_nbuckets,
425  Size memory);
426 static int hash_choose_num_partitions(double input_groups,
427  double hashentrysize,
428  int used_bits,
429  int *log2_npartittions);
430 static void initialize_hash_entry(AggState *aggstate,
431  TupleHashTable hashtable,
432  TupleHashEntry entry);
433 static void lookup_hash_entries(AggState *aggstate);
434 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
435 static void agg_fill_hash_table(AggState *aggstate);
436 static bool agg_refill_hash_table(AggState *aggstate);
439 static void hash_agg_check_limits(AggState *aggstate);
440 static void hash_agg_enter_spill_mode(AggState *aggstate);
441 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
442  int npartitions);
443 static void hashagg_finish_initial_spills(AggState *aggstate);
444 static void hashagg_reset_spill_state(AggState *aggstate);
446  int input_tapenum, int setno,
447  int64 input_tuples, double input_card,
448  int used_bits);
449 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
450 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
451  int used_bits, double input_groups,
452  double hashentrysize);
453 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
454  TupleTableSlot *slot, uint32 hash);
455 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
456  int setno);
457 static void hashagg_tapeinfo_init(AggState *aggstate);
458 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
459  int ndest);
460 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
461 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
462 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
463  AggState *aggstate, EState *estate,
464  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
465  Oid aggserialfn, Oid aggdeserialfn,
466  Datum initValue, bool initValueIsNull,
467  Oid *inputTypes, int numArguments);
468 
469 
470 /*
471  * Select the current grouping set; affects current_set and
472  * curaggcontext.
473  */
474 static void
475 select_current_set(AggState *aggstate, int setno, bool is_hash)
476 {
477  /*
478  * When changing this, also adapt ExecAggPlainTransByVal() and
479  * ExecAggPlainTransByRef().
480  */
481  if (is_hash)
482  aggstate->curaggcontext = aggstate->hashcontext;
483  else
484  aggstate->curaggcontext = aggstate->aggcontexts[setno];
485 
486  aggstate->current_set = setno;
487 }
488 
489 /*
490  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
491  * current_phase + 1. Juggle the tuplesorts accordingly.
492  *
493  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
494  * case, so when entering phase 0, all we need to do is drop open sorts.
495  */
496 static void
497 initialize_phase(AggState *aggstate, int newphase)
498 {
499  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
500 
501  /*
502  * Whatever the previous state, we're now done with whatever input
503  * tuplesort was in use.
504  */
505  if (aggstate->sort_in)
506  {
507  tuplesort_end(aggstate->sort_in);
508  aggstate->sort_in = NULL;
509  }
510 
511  if (newphase <= 1)
512  {
513  /*
514  * Discard any existing output tuplesort.
515  */
516  if (aggstate->sort_out)
517  {
518  tuplesort_end(aggstate->sort_out);
519  aggstate->sort_out = NULL;
520  }
521  }
522  else
523  {
524  /*
525  * The old output tuplesort becomes the new input one, and this is the
526  * right time to actually sort it.
527  */
528  aggstate->sort_in = aggstate->sort_out;
529  aggstate->sort_out = NULL;
530  Assert(aggstate->sort_in);
531  tuplesort_performsort(aggstate->sort_in);
532  }
533 
534  /*
535  * If this isn't the last phase, we need to sort appropriately for the
536  * next phase in sequence.
537  */
538  if (newphase > 0 && newphase < aggstate->numphases - 1)
539  {
540  Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
541  PlanState *outerNode = outerPlanState(aggstate);
542  TupleDesc tupDesc = ExecGetResultType(outerNode);
543 
544  aggstate->sort_out = tuplesort_begin_heap(tupDesc,
545  sortnode->numCols,
546  sortnode->sortColIdx,
547  sortnode->sortOperators,
548  sortnode->collations,
549  sortnode->nullsFirst,
550  work_mem,
551  NULL, false);
552  }
553 
554  aggstate->current_phase = newphase;
555  aggstate->phase = &aggstate->phases[newphase];
556 }
557 
558 /*
559  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
560  * populated by the previous phase. Copy it to the sorter for the next phase
561  * if any.
562  *
563  * Callers cannot rely on memory for tuple in returned slot remaining valid
564  * past any subsequently fetched tuple.
565  */
566 static TupleTableSlot *
568 {
569  TupleTableSlot *slot;
570 
571  if (aggstate->sort_in)
572  {
573  /* make sure we check for interrupts in either path through here */
575  if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
576  aggstate->sort_slot, NULL))
577  return NULL;
578  slot = aggstate->sort_slot;
579  }
580  else
581  slot = ExecProcNode(outerPlanState(aggstate));
582 
583  if (!TupIsNull(slot) && aggstate->sort_out)
584  tuplesort_puttupleslot(aggstate->sort_out, slot);
585 
586  return slot;
587 }
588 
589 /*
590  * (Re)Initialize an individual aggregate.
591  *
592  * This function handles only one grouping set, already set in
593  * aggstate->current_set.
594  *
595  * When called, CurrentMemoryContext should be the per-query context.
596  */
597 static void
599  AggStatePerGroup pergroupstate)
600 {
601  /*
602  * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
603  */
604  if (pertrans->numSortCols > 0)
605  {
606  /*
607  * In case of rescan, maybe there could be an uncompleted sort
608  * operation? Clean it up if so.
609  */
610  if (pertrans->sortstates[aggstate->current_set])
611  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
612 
613 
614  /*
615  * We use a plain Datum sorter when there's a single input column;
616  * otherwise sort the full tuple. (See comments for
617  * process_ordered_aggregate_single.)
618  */
619  if (pertrans->numInputs == 1)
620  {
621  Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
622 
623  pertrans->sortstates[aggstate->current_set] =
624  tuplesort_begin_datum(attr->atttypid,
625  pertrans->sortOperators[0],
626  pertrans->sortCollations[0],
627  pertrans->sortNullsFirst[0],
628  work_mem, NULL, false);
629  }
630  else
631  pertrans->sortstates[aggstate->current_set] =
632  tuplesort_begin_heap(pertrans->sortdesc,
633  pertrans->numSortCols,
634  pertrans->sortColIdx,
635  pertrans->sortOperators,
636  pertrans->sortCollations,
637  pertrans->sortNullsFirst,
638  work_mem, NULL, false);
639  }
640 
641  /*
642  * (Re)set transValue to the initial value.
643  *
644  * Note that when the initial value is pass-by-ref, we must copy it (into
645  * the aggcontext) since we will pfree the transValue later.
646  */
647  if (pertrans->initValueIsNull)
648  pergroupstate->transValue = pertrans->initValue;
649  else
650  {
651  MemoryContext oldContext;
652 
654  pergroupstate->transValue = datumCopy(pertrans->initValue,
655  pertrans->transtypeByVal,
656  pertrans->transtypeLen);
657  MemoryContextSwitchTo(oldContext);
658  }
659  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
660 
661  /*
662  * If the initial value for the transition state doesn't exist in the
663  * pg_aggregate table then we will let the first non-NULL value returned
664  * from the outer procNode become the initial value. (This is useful for
665  * aggregates like max() and min().) The noTransValue flag signals that we
666  * still need to do this.
667  */
668  pergroupstate->noTransValue = pertrans->initValueIsNull;
669 }
670 
671 /*
672  * Initialize all aggregate transition states for a new group of input values.
673  *
674  * If there are multiple grouping sets, we initialize only the first numReset
675  * of them (the grouping sets are ordered so that the most specific one, which
676  * is reset most often, is first). As a convenience, if numReset is 0, we
677  * reinitialize all sets.
678  *
679  * NB: This cannot be used for hash aggregates, as for those the grouping set
680  * number has to be specified from further up.
681  *
682  * When called, CurrentMemoryContext should be the per-query context.
683  */
684 static void
686  AggStatePerGroup *pergroups,
687  int numReset)
688 {
689  int transno;
690  int numGroupingSets = Max(aggstate->phase->numsets, 1);
691  int setno = 0;
692  int numTrans = aggstate->numtrans;
693  AggStatePerTrans transstates = aggstate->pertrans;
694 
695  if (numReset == 0)
696  numReset = numGroupingSets;
697 
698  for (setno = 0; setno < numReset; setno++)
699  {
700  AggStatePerGroup pergroup = pergroups[setno];
701 
702  select_current_set(aggstate, setno, false);
703 
704  for (transno = 0; transno < numTrans; transno++)
705  {
706  AggStatePerTrans pertrans = &transstates[transno];
707  AggStatePerGroup pergroupstate = &pergroup[transno];
708 
709  initialize_aggregate(aggstate, pertrans, pergroupstate);
710  }
711  }
712 }
713 
714 /*
715  * Given new input value(s), advance the transition function of one aggregate
716  * state within one grouping set only (already set in aggstate->current_set)
717  *
718  * The new values (and null flags) have been preloaded into argument positions
719  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
720  * pass to the transition function. We also expect that the static fields of
721  * the fcinfo are already initialized; that was done by ExecInitAgg().
722  *
723  * It doesn't matter which memory context this is called in.
724  */
725 static void
727  AggStatePerTrans pertrans,
728  AggStatePerGroup pergroupstate)
729 {
730  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
731  MemoryContext oldContext;
732  Datum newVal;
733 
734  if (pertrans->transfn.fn_strict)
735  {
736  /*
737  * For a strict transfn, nothing happens when there's a NULL input; we
738  * just keep the prior transValue.
739  */
740  int numTransInputs = pertrans->numTransInputs;
741  int i;
742 
743  for (i = 1; i <= numTransInputs; i++)
744  {
745  if (fcinfo->args[i].isnull)
746  return;
747  }
748  if (pergroupstate->noTransValue)
749  {
750  /*
751  * transValue has not been initialized. This is the first non-NULL
752  * input value. We use it as the initial value for transValue. (We
753  * already checked that the agg's input type is binary-compatible
754  * with its transtype, so straight copy here is OK.)
755  *
756  * We must copy the datum into aggcontext if it is pass-by-ref. We
757  * do not need to pfree the old transValue, since it's NULL.
758  */
760  pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
761  pertrans->transtypeByVal,
762  pertrans->transtypeLen);
763  pergroupstate->transValueIsNull = false;
764  pergroupstate->noTransValue = false;
765  MemoryContextSwitchTo(oldContext);
766  return;
767  }
768  if (pergroupstate->transValueIsNull)
769  {
770  /*
771  * Don't call a strict function with NULL inputs. Note it is
772  * possible to get here despite the above tests, if the transfn is
773  * strict *and* returned a NULL on a prior cycle. If that happens
774  * we will propagate the NULL all the way to the end.
775  */
776  return;
777  }
778  }
779 
780  /* We run the transition functions in per-input-tuple memory context */
781  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
782 
783  /* set up aggstate->curpertrans for AggGetAggref() */
784  aggstate->curpertrans = pertrans;
785 
786  /*
787  * OK to call the transition function
788  */
789  fcinfo->args[0].value = pergroupstate->transValue;
790  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
791  fcinfo->isnull = false; /* just in case transfn doesn't set it */
792 
793  newVal = FunctionCallInvoke(fcinfo);
794 
795  aggstate->curpertrans = NULL;
796 
797  /*
798  * If pass-by-ref datatype, must copy the new value into aggcontext and
799  * free the prior transValue. But if transfn returned a pointer to its
800  * first input, we don't need to do anything. Also, if transfn returned a
801  * pointer to a R/W expanded object that is already a child of the
802  * aggcontext, assume we can adopt that value without copying it.
803  *
804  * It's safe to compare newVal with pergroup->transValue without regard
805  * for either being NULL, because ExecAggTransReparent() takes care to set
806  * transValue to 0 when NULL. Otherwise we could end up accidentally not
807  * reparenting, when the transValue has the same numerical value as
808  * newValue, despite being NULL. This is a somewhat hot path, making it
809  * undesirable to instead solve this with another branch for the common
810  * case of the transition function returning its (modified) input
811  * argument.
812  */
813  if (!pertrans->transtypeByVal &&
814  DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
815  newVal = ExecAggTransReparent(aggstate, pertrans,
816  newVal, fcinfo->isnull,
817  pergroupstate->transValue,
818  pergroupstate->transValueIsNull);
819 
820  pergroupstate->transValue = newVal;
821  pergroupstate->transValueIsNull = fcinfo->isnull;
822 
823  MemoryContextSwitchTo(oldContext);
824 }
825 
826 /*
827  * Advance each aggregate transition state for one input tuple. The input
828  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
829  * accessible to ExecEvalExpr.
830  *
831  * We have two sets of transition states to handle: one for sorted aggregation
832  * and one for hashed; we do them both here, to avoid multiple evaluation of
833  * the inputs.
834  *
835  * When called, CurrentMemoryContext should be the per-query context.
836  */
837 static void
839 {
840  bool dummynull;
841 
843  aggstate->tmpcontext,
844  &dummynull);
845 }
846 
847 /*
848  * Run the transition function for a DISTINCT or ORDER BY aggregate
849  * with only one input. This is called after we have completed
850  * entering all the input values into the sort object. We complete the
851  * sort, read out the values in sorted order, and run the transition
852  * function on each value (applying DISTINCT if appropriate).
853  *
854  * Note that the strictness of the transition function was checked when
855  * entering the values into the sort, so we don't check it again here;
856  * we just apply standard SQL DISTINCT logic.
857  *
858  * The one-input case is handled separately from the multi-input case
859  * for performance reasons: for single by-value inputs, such as the
860  * common case of count(distinct id), the tuplesort_getdatum code path
861  * is around 300% faster. (The speedup for by-reference types is less
862  * but still noticeable.)
863  *
864  * This function handles only one grouping set (already set in
865  * aggstate->current_set).
866  *
867  * When called, CurrentMemoryContext should be the per-query context.
868  */
869 static void
871  AggStatePerTrans pertrans,
872  AggStatePerGroup pergroupstate)
873 {
874  Datum oldVal = (Datum) 0;
875  bool oldIsNull = true;
876  bool haveOldVal = false;
877  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
878  MemoryContext oldContext;
879  bool isDistinct = (pertrans->numDistinctCols > 0);
880  Datum newAbbrevVal = (Datum) 0;
881  Datum oldAbbrevVal = (Datum) 0;
882  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
883  Datum *newVal;
884  bool *isNull;
885 
886  Assert(pertrans->numDistinctCols < 2);
887 
888  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
889 
890  /* Load the column into argument 1 (arg 0 will be transition value) */
891  newVal = &fcinfo->args[1].value;
892  isNull = &fcinfo->args[1].isnull;
893 
894  /*
895  * Note: if input type is pass-by-ref, the datums returned by the sort are
896  * freshly palloc'd in the per-query context, so we must be careful to
897  * pfree them when they are no longer needed.
898  */
899 
900  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
901  true, newVal, isNull, &newAbbrevVal))
902  {
903  /*
904  * Clear and select the working context for evaluation of the equality
905  * function and transition function.
906  */
907  MemoryContextReset(workcontext);
908  oldContext = MemoryContextSwitchTo(workcontext);
909 
910  /*
911  * If DISTINCT mode, and not distinct from prior, skip it.
912  */
913  if (isDistinct &&
914  haveOldVal &&
915  ((oldIsNull && *isNull) ||
916  (!oldIsNull && !*isNull &&
917  oldAbbrevVal == newAbbrevVal &&
919  pertrans->aggCollation,
920  oldVal, *newVal)))))
921  {
922  /* equal to prior, so forget this one */
923  if (!pertrans->inputtypeByVal && !*isNull)
924  pfree(DatumGetPointer(*newVal));
925  }
926  else
927  {
928  advance_transition_function(aggstate, pertrans, pergroupstate);
929  /* forget the old value, if any */
930  if (!oldIsNull && !pertrans->inputtypeByVal)
931  pfree(DatumGetPointer(oldVal));
932  /* and remember the new one for subsequent equality checks */
933  oldVal = *newVal;
934  oldAbbrevVal = newAbbrevVal;
935  oldIsNull = *isNull;
936  haveOldVal = true;
937  }
938 
939  MemoryContextSwitchTo(oldContext);
940  }
941 
942  if (!oldIsNull && !pertrans->inputtypeByVal)
943  pfree(DatumGetPointer(oldVal));
944 
945  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
946  pertrans->sortstates[aggstate->current_set] = NULL;
947 }
948 
949 /*
950  * Run the transition function for a DISTINCT or ORDER BY aggregate
951  * with more than one input. This is called after we have completed
952  * entering all the input values into the sort object. We complete the
953  * sort, read out the values in sorted order, and run the transition
954  * function on each value (applying DISTINCT if appropriate).
955  *
956  * This function handles only one grouping set (already set in
957  * aggstate->current_set).
958  *
959  * When called, CurrentMemoryContext should be the per-query context.
960  */
961 static void
963  AggStatePerTrans pertrans,
964  AggStatePerGroup pergroupstate)
965 {
966  ExprContext *tmpcontext = aggstate->tmpcontext;
967  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
968  TupleTableSlot *slot1 = pertrans->sortslot;
969  TupleTableSlot *slot2 = pertrans->uniqslot;
970  int numTransInputs = pertrans->numTransInputs;
971  int numDistinctCols = pertrans->numDistinctCols;
972  Datum newAbbrevVal = (Datum) 0;
973  Datum oldAbbrevVal = (Datum) 0;
974  bool haveOldValue = false;
975  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
976  int i;
977 
978  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
979 
980  ExecClearTuple(slot1);
981  if (slot2)
982  ExecClearTuple(slot2);
983 
984  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
985  true, true, slot1, &newAbbrevVal))
986  {
988 
989  tmpcontext->ecxt_outertuple = slot1;
990  tmpcontext->ecxt_innertuple = slot2;
991 
992  if (numDistinctCols == 0 ||
993  !haveOldValue ||
994  newAbbrevVal != oldAbbrevVal ||
995  !ExecQual(pertrans->equalfnMulti, tmpcontext))
996  {
997  /*
998  * Extract the first numTransInputs columns as datums to pass to
999  * the transfn.
1000  */
1001  slot_getsomeattrs(slot1, numTransInputs);
1002 
1003  /* Load values into fcinfo */
1004  /* Start from 1, since the 0th arg will be the transition value */
1005  for (i = 0; i < numTransInputs; i++)
1006  {
1007  fcinfo->args[i + 1].value = slot1->tts_values[i];
1008  fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1009  }
1010 
1011  advance_transition_function(aggstate, pertrans, pergroupstate);
1012 
1013  if (numDistinctCols > 0)
1014  {
1015  /* swap the slot pointers to retain the current tuple */
1016  TupleTableSlot *tmpslot = slot2;
1017 
1018  slot2 = slot1;
1019  slot1 = tmpslot;
1020  /* avoid ExecQual() calls by reusing abbreviated keys */
1021  oldAbbrevVal = newAbbrevVal;
1022  haveOldValue = true;
1023  }
1024  }
1025 
1026  /* Reset context each time */
1027  ResetExprContext(tmpcontext);
1028 
1029  ExecClearTuple(slot1);
1030  }
1031 
1032  if (slot2)
1033  ExecClearTuple(slot2);
1034 
1035  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1036  pertrans->sortstates[aggstate->current_set] = NULL;
1037 
1038  /* restore previous slot, potentially in use for grouping sets */
1039  tmpcontext->ecxt_outertuple = save;
1040 }
1041 
1042 /*
1043  * Compute the final value of one aggregate.
1044  *
1045  * This function handles only one grouping set (already set in
1046  * aggstate->current_set).
1047  *
1048  * The finalfn will be run, and the result delivered, in the
1049  * output-tuple context; caller's CurrentMemoryContext does not matter.
1050  *
1051  * The finalfn uses the state as set in the transno. This also might be
1052  * being used by another aggregate function, so it's important that we do
1053  * nothing destructive here.
1054  */
1055 static void
1057  AggStatePerAgg peragg,
1058  AggStatePerGroup pergroupstate,
1059  Datum *resultVal, bool *resultIsNull)
1060 {
1061  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1062  bool anynull = false;
1063  MemoryContext oldContext;
1064  int i;
1065  ListCell *lc;
1066  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1067 
1069 
1070  /*
1071  * Evaluate any direct arguments. We do this even if there's no finalfn
1072  * (which is unlikely anyway), so that side-effects happen as expected.
1073  * The direct arguments go into arg positions 1 and up, leaving position 0
1074  * for the transition state value.
1075  */
1076  i = 1;
1077  foreach(lc, peragg->aggdirectargs)
1078  {
1079  ExprState *expr = (ExprState *) lfirst(lc);
1080 
1081  fcinfo->args[i].value = ExecEvalExpr(expr,
1082  aggstate->ss.ps.ps_ExprContext,
1083  &fcinfo->args[i].isnull);
1084  anynull |= fcinfo->args[i].isnull;
1085  i++;
1086  }
1087 
1088  /*
1089  * Apply the agg's finalfn if one is provided, else return transValue.
1090  */
1091  if (OidIsValid(peragg->finalfn_oid))
1092  {
1093  int numFinalArgs = peragg->numFinalArgs;
1094 
1095  /* set up aggstate->curperagg for AggGetAggref() */
1096  aggstate->curperagg = peragg;
1097 
1098  InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1099  numFinalArgs,
1100  pertrans->aggCollation,
1101  (void *) aggstate, NULL);
1102 
1103  /* Fill in the transition state value */
1104  fcinfo->args[0].value =
1105  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1106  pergroupstate->transValueIsNull,
1107  pertrans->transtypeLen);
1108  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1109  anynull |= pergroupstate->transValueIsNull;
1110 
1111  /* Fill any remaining argument positions with nulls */
1112  for (; i < numFinalArgs; i++)
1113  {
1114  fcinfo->args[i].value = (Datum) 0;
1115  fcinfo->args[i].isnull = true;
1116  anynull = true;
1117  }
1118 
1119  if (fcinfo->flinfo->fn_strict && anynull)
1120  {
1121  /* don't call a strict function with NULL inputs */
1122  *resultVal = (Datum) 0;
1123  *resultIsNull = true;
1124  }
1125  else
1126  {
1127  *resultVal = FunctionCallInvoke(fcinfo);
1128  *resultIsNull = fcinfo->isnull;
1129  }
1130  aggstate->curperagg = NULL;
1131  }
1132  else
1133  {
1134  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1135  *resultVal = pergroupstate->transValue;
1136  *resultIsNull = pergroupstate->transValueIsNull;
1137  }
1138 
1139  /*
1140  * If result is pass-by-ref, make sure it is in the right context.
1141  */
1142  if (!peragg->resulttypeByVal && !*resultIsNull &&
1144  DatumGetPointer(*resultVal)))
1145  *resultVal = datumCopy(*resultVal,
1146  peragg->resulttypeByVal,
1147  peragg->resulttypeLen);
1148 
1149  MemoryContextSwitchTo(oldContext);
1150 }
1151 
1152 /*
1153  * Compute the output value of one partial aggregate.
1154  *
1155  * The serialization function will be run, and the result delivered, in the
1156  * output-tuple context; caller's CurrentMemoryContext does not matter.
1157  */
1158 static void
1160  AggStatePerAgg peragg,
1161  AggStatePerGroup pergroupstate,
1162  Datum *resultVal, bool *resultIsNull)
1163 {
1164  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1165  MemoryContext oldContext;
1166 
1168 
1169  /*
1170  * serialfn_oid will be set if we must serialize the transvalue before
1171  * returning it
1172  */
1173  if (OidIsValid(pertrans->serialfn_oid))
1174  {
1175  /* Don't call a strict serialization function with NULL input. */
1176  if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1177  {
1178  *resultVal = (Datum) 0;
1179  *resultIsNull = true;
1180  }
1181  else
1182  {
1183  FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1184 
1185  fcinfo->args[0].value =
1186  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1187  pergroupstate->transValueIsNull,
1188  pertrans->transtypeLen);
1189  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1190  fcinfo->isnull = false;
1191 
1192  *resultVal = FunctionCallInvoke(fcinfo);
1193  *resultIsNull = fcinfo->isnull;
1194  }
1195  }
1196  else
1197  {
1198  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1199  *resultVal = pergroupstate->transValue;
1200  *resultIsNull = pergroupstate->transValueIsNull;
1201  }
1202 
1203  /* If result is pass-by-ref, make sure it is in the right context. */
1204  if (!peragg->resulttypeByVal && !*resultIsNull &&
1206  DatumGetPointer(*resultVal)))
1207  *resultVal = datumCopy(*resultVal,
1208  peragg->resulttypeByVal,
1209  peragg->resulttypeLen);
1210 
1211  MemoryContextSwitchTo(oldContext);
1212 }
1213 
1214 /*
1215  * Extract the attributes that make up the grouping key into the
1216  * hashslot. This is necessary to compute the hash or perform a lookup.
1217  */
1218 static inline void
1220  TupleTableSlot *inputslot,
1221  TupleTableSlot *hashslot)
1222 {
1223  int i;
1224 
1225  /* transfer just the needed columns into hashslot */
1226  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1227  ExecClearTuple(hashslot);
1228 
1229  for (i = 0; i < perhash->numhashGrpCols; i++)
1230  {
1231  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1232 
1233  hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1234  hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1235  }
1236  ExecStoreVirtualTuple(hashslot);
1237 }
1238 
1239 /*
1240  * Prepare to finalize and project based on the specified representative tuple
1241  * slot and grouping set.
1242  *
1243  * In the specified tuple slot, force to null all attributes that should be
1244  * read as null in the context of the current grouping set. Also stash the
1245  * current group bitmap where GroupingExpr can get at it.
1246  *
1247  * This relies on three conditions:
1248  *
1249  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1250  * only reference it in evaluations, which will only access individual
1251  * attributes.
1252  *
1253  * 2) No system columns are going to need to be nulled. (If a system column is
1254  * referenced in a group clause, it is actually projected in the outer plan
1255  * tlist.)
1256  *
1257  * 3) Within a given phase, we never need to recover the value of an attribute
1258  * once it has been set to null.
1259  *
1260  * Poking into the slot this way is a bit ugly, but the consensus is that the
1261  * alternative was worse.
1262  */
1263 static void
1264 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1265 {
1266  if (aggstate->phase->grouped_cols)
1267  {
1268  Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1269 
1270  aggstate->grouped_cols = grouped_cols;
1271 
1272  if (TTS_EMPTY(slot))
1273  {
1274  /*
1275  * Force all values to be NULL if working on an empty input tuple
1276  * (i.e. an empty grouping set for which no input rows were
1277  * supplied).
1278  */
1279  ExecStoreAllNullTuple(slot);
1280  }
1281  else if (aggstate->all_grouped_cols)
1282  {
1283  ListCell *lc;
1284 
1285  /* all_grouped_cols is arranged in desc order */
1287 
1288  foreach(lc, aggstate->all_grouped_cols)
1289  {
1290  int attnum = lfirst_int(lc);
1291 
1292  if (!bms_is_member(attnum, grouped_cols))
1293  slot->tts_isnull[attnum - 1] = true;
1294  }
1295  }
1296  }
1297 }
1298 
1299 /*
1300  * Compute the final value of all aggregates for one group.
1301  *
1302  * This function handles only one grouping set at a time, which the caller must
1303  * have selected. It's also the caller's responsibility to adjust the supplied
1304  * pergroup parameter to point to the current set's transvalues.
1305  *
1306  * Results are stored in the output econtext aggvalues/aggnulls.
1307  */
1308 static void
1310  AggStatePerAgg peraggs,
1311  AggStatePerGroup pergroup)
1312 {
1313  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1314  Datum *aggvalues = econtext->ecxt_aggvalues;
1315  bool *aggnulls = econtext->ecxt_aggnulls;
1316  int aggno;
1317  int transno;
1318 
1319  /*
1320  * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1321  * inputs and run the transition functions.
1322  */
1323  for (transno = 0; transno < aggstate->numtrans; transno++)
1324  {
1325  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1326  AggStatePerGroup pergroupstate;
1327 
1328  pergroupstate = &pergroup[transno];
1329 
1330  if (pertrans->numSortCols > 0)
1331  {
1332  Assert(aggstate->aggstrategy != AGG_HASHED &&
1333  aggstate->aggstrategy != AGG_MIXED);
1334 
1335  if (pertrans->numInputs == 1)
1337  pertrans,
1338  pergroupstate);
1339  else
1341  pertrans,
1342  pergroupstate);
1343  }
1344  }
1345 
1346  /*
1347  * Run the final functions.
1348  */
1349  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1350  {
1351  AggStatePerAgg peragg = &peraggs[aggno];
1352  int transno = peragg->transno;
1353  AggStatePerGroup pergroupstate;
1354 
1355  pergroupstate = &pergroup[transno];
1356 
1357  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1358  finalize_partialaggregate(aggstate, peragg, pergroupstate,
1359  &aggvalues[aggno], &aggnulls[aggno]);
1360  else
1361  finalize_aggregate(aggstate, peragg, pergroupstate,
1362  &aggvalues[aggno], &aggnulls[aggno]);
1363  }
1364 }
1365 
1366 /*
1367  * Project the result of a group (whose aggs have already been calculated by
1368  * finalize_aggregates). Returns the result slot, or NULL if no row is
1369  * projected (suppressed by qual).
1370  */
1371 static TupleTableSlot *
1373 {
1374  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1375 
1376  /*
1377  * Check the qual (HAVING clause); if the group does not match, ignore it.
1378  */
1379  if (ExecQual(aggstate->ss.ps.qual, econtext))
1380  {
1381  /*
1382  * Form and return projection tuple using the aggregate results and
1383  * the representative input tuple.
1384  */
1385  return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1386  }
1387  else
1388  InstrCountFiltered1(aggstate, 1);
1389 
1390  return NULL;
1391 }
1392 
1393 /*
1394  * Find input-tuple columns that are needed, dividing them into
1395  * aggregated and unaggregated sets.
1396  */
1397 static void
1398 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1399 {
1400  Agg *agg = (Agg *) aggstate->ss.ps.plan;
1401  FindColsContext context;
1402 
1403  context.is_aggref = false;
1404  context.aggregated = NULL;
1405  context.unaggregated = NULL;
1406 
1407  /* Examine tlist and quals */
1408  (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1409  (void) find_cols_walker((Node *) agg->plan.qual, &context);
1410 
1411  /* In some cases, grouping columns will not appear in the tlist */
1412  for (int i = 0; i < agg->numCols; i++)
1413  context.unaggregated = bms_add_member(context.unaggregated,
1414  agg->grpColIdx[i]);
1415 
1416  *aggregated = context.aggregated;
1417  *unaggregated = context.unaggregated;
1418 }
1419 
1420 static bool
1422 {
1423  if (node == NULL)
1424  return false;
1425  if (IsA(node, Var))
1426  {
1427  Var *var = (Var *) node;
1428 
1429  /* setrefs.c should have set the varno to OUTER_VAR */
1430  Assert(var->varno == OUTER_VAR);
1431  Assert(var->varlevelsup == 0);
1432  if (context->is_aggref)
1433  context->aggregated = bms_add_member(context->aggregated,
1434  var->varattno);
1435  else
1436  context->unaggregated = bms_add_member(context->unaggregated,
1437  var->varattno);
1438  return false;
1439  }
1440  if (IsA(node, Aggref))
1441  {
1442  Assert(!context->is_aggref);
1443  context->is_aggref = true;
1444  expression_tree_walker(node, find_cols_walker, (void *) context);
1445  context->is_aggref = false;
1446  return false;
1447  }
1449  (void *) context);
1450 }
1451 
1452 /*
1453  * (Re-)initialize the hash table(s) to empty.
1454  *
1455  * To implement hashed aggregation, we need a hashtable that stores a
1456  * representative tuple and an array of AggStatePerGroup structs for each
1457  * distinct set of GROUP BY column values. We compute the hash key from the
1458  * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1459  * for each entry.
1460  *
1461  * We have a separate hashtable and associated perhash data structure for each
1462  * grouping set for which we're doing hashing.
1463  *
1464  * The contents of the hash tables always live in the hashcontext's per-tuple
1465  * memory context (there is only one of these for all tables together, since
1466  * they are all reset at the same time).
1467  */
1468 static void
1470 {
1471  int setno;
1472 
1473  for (setno = 0; setno < aggstate->num_hashes; ++setno)
1474  {
1475  AggStatePerHash perhash = &aggstate->perhash[setno];
1476  long nbuckets;
1477  Size memory;
1478 
1479  if (perhash->hashtable != NULL)
1480  {
1481  ResetTupleHashTable(perhash->hashtable);
1482  continue;
1483  }
1484 
1485  Assert(perhash->aggnode->numGroups > 0);
1486 
1487  memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1488 
1489  /* choose reasonable number of buckets per hashtable */
1490  nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1491  perhash->aggnode->numGroups,
1492  memory);
1493 
1494  build_hash_table(aggstate, setno, nbuckets);
1495  }
1496 
1497  aggstate->hash_ngroups_current = 0;
1498 }
1499 
1500 /*
1501  * Build a single hashtable for this grouping set.
1502  */
1503 static void
1504 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1505 {
1506  AggStatePerHash perhash = &aggstate->perhash[setno];
1507  MemoryContext metacxt = aggstate->hash_metacxt;
1508  MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1509  MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1510  Size additionalsize;
1511 
1512  Assert(aggstate->aggstrategy == AGG_HASHED ||
1513  aggstate->aggstrategy == AGG_MIXED);
1514 
1515  /*
1516  * Used to make sure initial hash table allocation does not exceed
1517  * hash_mem. Note that the estimate does not include space for
1518  * pass-by-reference transition data values, nor for the representative
1519  * tuple of each group.
1520  */
1521  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1522 
1523  perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1524  perhash->hashslot->tts_tupleDescriptor,
1525  perhash->numCols,
1526  perhash->hashGrpColIdxHash,
1527  perhash->eqfuncoids,
1528  perhash->hashfunctions,
1529  perhash->aggnode->grpCollations,
1530  nbuckets,
1531  additionalsize,
1532  metacxt,
1533  hashcxt,
1534  tmpcxt,
1535  DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1536 }
1537 
1538 /*
1539  * Compute columns that actually need to be stored in hashtable entries. The
1540  * incoming tuples from the child plan node will contain grouping columns,
1541  * other columns referenced in our targetlist and qual, columns used to
1542  * compute the aggregate functions, and perhaps just junk columns we don't use
1543  * at all. Only columns of the first two types need to be stored in the
1544  * hashtable, and getting rid of the others can make the table entries
1545  * significantly smaller. The hashtable only contains the relevant columns,
1546  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1547  * into the format of the normal input descriptor.
1548  *
1549  * Additional columns, in addition to the columns grouped by, come from two
1550  * sources: Firstly functionally dependent columns that we don't need to group
1551  * by themselves, and secondly ctids for row-marks.
1552  *
1553  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1554  * then build an array of the columns included in the hashtable. We might
1555  * still have duplicates if the passed-in grpColIdx has them, which can happen
1556  * in edge cases from semijoins/distinct; these can't always be removed,
1557  * because it's not certain that the duplicate cols will be using the same
1558  * hash function.
1559  *
1560  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1561  * the per-query context (unlike the hash table itself).
1562  */
1563 static void
1565 {
1566  Bitmapset *base_colnos;
1567  Bitmapset *aggregated_colnos;
1568  TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1569  List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1570  int numHashes = aggstate->num_hashes;
1571  EState *estate = aggstate->ss.ps.state;
1572  int j;
1573 
1574  /* Find Vars that will be needed in tlist and qual */
1575  find_cols(aggstate, &aggregated_colnos, &base_colnos);
1576  aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1577  aggstate->max_colno_needed = 0;
1578  aggstate->all_cols_needed = true;
1579 
1580  for (int i = 0; i < scanDesc->natts; i++)
1581  {
1582  int colno = i + 1;
1583  if (bms_is_member(colno, aggstate->colnos_needed))
1584  aggstate->max_colno_needed = colno;
1585  else
1586  aggstate->all_cols_needed = false;
1587  }
1588 
1589  for (j = 0; j < numHashes; ++j)
1590  {
1591  AggStatePerHash perhash = &aggstate->perhash[j];
1592  Bitmapset *colnos = bms_copy(base_colnos);
1593  AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1594  List *hashTlist = NIL;
1595  TupleDesc hashDesc;
1596  int maxCols;
1597  int i;
1598 
1599  perhash->largestGrpColIdx = 0;
1600 
1601  /*
1602  * If we're doing grouping sets, then some Vars might be referenced in
1603  * tlist/qual for the benefit of other grouping sets, but not needed
1604  * when hashing; i.e. prepare_projection_slot will null them out, so
1605  * there'd be no point storing them. Use prepare_projection_slot's
1606  * logic to determine which.
1607  */
1608  if (aggstate->phases[0].grouped_cols)
1609  {
1610  Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1611  ListCell *lc;
1612 
1613  foreach(lc, aggstate->all_grouped_cols)
1614  {
1615  int attnum = lfirst_int(lc);
1616 
1617  if (!bms_is_member(attnum, grouped_cols))
1618  colnos = bms_del_member(colnos, attnum);
1619  }
1620  }
1621 
1622  /*
1623  * Compute maximum number of input columns accounting for possible
1624  * duplications in the grpColIdx array, which can happen in some edge
1625  * cases where HashAggregate was generated as part of a semijoin or a
1626  * DISTINCT.
1627  */
1628  maxCols = bms_num_members(colnos) + perhash->numCols;
1629 
1630  perhash->hashGrpColIdxInput =
1631  palloc(maxCols * sizeof(AttrNumber));
1632  perhash->hashGrpColIdxHash =
1633  palloc(perhash->numCols * sizeof(AttrNumber));
1634 
1635  /* Add all the grouping columns to colnos */
1636  for (i = 0; i < perhash->numCols; i++)
1637  colnos = bms_add_member(colnos, grpColIdx[i]);
1638 
1639  /*
1640  * First build mapping for columns directly hashed. These are the
1641  * first, because they'll be accessed when computing hash values and
1642  * comparing tuples for exact matches. We also build simple mapping
1643  * for execGrouping, so it knows where to find the to-be-hashed /
1644  * compared columns in the input.
1645  */
1646  for (i = 0; i < perhash->numCols; i++)
1647  {
1648  perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1649  perhash->hashGrpColIdxHash[i] = i + 1;
1650  perhash->numhashGrpCols++;
1651  /* delete already mapped columns */
1652  bms_del_member(colnos, grpColIdx[i]);
1653  }
1654 
1655  /* and add the remaining columns */
1656  while ((i = bms_first_member(colnos)) >= 0)
1657  {
1658  perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1659  perhash->numhashGrpCols++;
1660  }
1661 
1662  /* and build a tuple descriptor for the hashtable */
1663  for (i = 0; i < perhash->numhashGrpCols; i++)
1664  {
1665  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1666 
1667  hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1668  perhash->largestGrpColIdx =
1669  Max(varNumber + 1, perhash->largestGrpColIdx);
1670  }
1671 
1672  hashDesc = ExecTypeFromTL(hashTlist);
1673 
1674  execTuplesHashPrepare(perhash->numCols,
1675  perhash->aggnode->grpOperators,
1676  &perhash->eqfuncoids,
1677  &perhash->hashfunctions);
1678  perhash->hashslot =
1679  ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1681 
1682  list_free(hashTlist);
1683  bms_free(colnos);
1684  }
1685 
1686  bms_free(base_colnos);
1687 }
1688 
1689 /*
1690  * Estimate per-hash-table-entry overhead.
1691  */
1692 Size
1693 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1694 {
1695  Size tupleChunkSize;
1696  Size pergroupChunkSize;
1697  Size transitionChunkSize;
1698  Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1699  tupleWidth);
1700  Size pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1701 
1702  tupleChunkSize = CHUNKHDRSZ + tupleSize;
1703 
1704  if (pergroupSize > 0)
1705  pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1706  else
1707  pergroupChunkSize = 0;
1708 
1709  if (transitionSpace > 0)
1710  transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1711  else
1712  transitionChunkSize = 0;
1713 
1714  return
1715  sizeof(TupleHashEntryData) +
1716  tupleChunkSize +
1717  pergroupChunkSize +
1718  transitionChunkSize;
1719 }
1720 
1721 /*
1722  * hashagg_recompile_expressions()
1723  *
1724  * Identifies the right phase, compiles the right expression given the
1725  * arguments, and then sets phase->evalfunc to that expression.
1726  *
1727  * Different versions of the compiled expression are needed depending on
1728  * whether hash aggregation has spilled or not, and whether it's reading from
1729  * the outer plan or a tape. Before spilling to disk, the expression reads
1730  * from the outer plan and does not need to perform a NULL check. After
1731  * HashAgg begins to spill, new groups will not be created in the hash table,
1732  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1733  * pointer check to the expression. Then, when reading spilled data from a
1734  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1735  *
1736  * It would be wasteful to recompile every time, so cache the compiled
1737  * expressions in the AggStatePerPhase, and reuse when appropriate.
1738  */
1739 static void
1740 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1741 {
1742  AggStatePerPhase phase;
1743  int i = minslot ? 1 : 0;
1744  int j = nullcheck ? 1 : 0;
1745 
1746  Assert(aggstate->aggstrategy == AGG_HASHED ||
1747  aggstate->aggstrategy == AGG_MIXED);
1748 
1749  if (aggstate->aggstrategy == AGG_HASHED)
1750  phase = &aggstate->phases[0];
1751  else /* AGG_MIXED */
1752  phase = &aggstate->phases[1];
1753 
1754  if (phase->evaltrans_cache[i][j] == NULL)
1755  {
1756  const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1757  bool outerfixed = aggstate->ss.ps.outeropsfixed;
1758  bool dohash = true;
1759  bool dosort = false;
1760 
1761  /*
1762  * If minslot is true, that means we are processing a spilled batch
1763  * (inside agg_refill_hash_table()), and we must not advance the
1764  * sorted grouping sets.
1765  */
1766  if (aggstate->aggstrategy == AGG_MIXED && !minslot)
1767  dosort = true;
1768 
1769  /* temporarily change the outerops while compiling the expression */
1770  if (minslot)
1771  {
1772  aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1773  aggstate->ss.ps.outeropsfixed = true;
1774  }
1775 
1776  phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1777  dosort, dohash,
1778  nullcheck);
1779 
1780  /* change back */
1781  aggstate->ss.ps.outerops = outerops;
1782  aggstate->ss.ps.outeropsfixed = outerfixed;
1783  }
1784 
1785  phase->evaltrans = phase->evaltrans_cache[i][j];
1786 }
1787 
1788 /*
1789  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1790  * number of partitions we expect to create (if we do spill).
1791  *
1792  * There are two limits: a memory limit, and also an ngroups limit. The
1793  * ngroups limit becomes important when we expect transition values to grow
1794  * substantially larger than the initial value.
1795  */
1796 void
1797 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1798  Size *mem_limit, uint64 *ngroups_limit,
1799  int *num_partitions)
1800 {
1801  int npartitions;
1802  Size partition_mem;
1803  int hash_mem = get_hash_mem();
1804 
1805  /* if not expected to spill, use all of hash_mem */
1806  if (input_groups * hashentrysize < hash_mem * 1024L)
1807  {
1808  if (num_partitions != NULL)
1809  *num_partitions = 0;
1810  *mem_limit = hash_mem * 1024L;
1811  *ngroups_limit = *mem_limit / hashentrysize;
1812  return;
1813  }
1814 
1815  /*
1816  * Calculate expected memory requirements for spilling, which is the size
1817  * of the buffers needed for all the tapes that need to be open at once.
1818  * Then, subtract that from the memory available for holding hash tables.
1819  */
1820  npartitions = hash_choose_num_partitions(input_groups,
1821  hashentrysize,
1822  used_bits,
1823  NULL);
1824  if (num_partitions != NULL)
1825  *num_partitions = npartitions;
1826 
1827  partition_mem =
1829  HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1830 
1831  /*
1832  * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1833  * minimum number of partitions, so we aren't going to dramatically exceed
1834  * work mem anyway.
1835  */
1836  if (hash_mem * 1024L > 4 * partition_mem)
1837  *mem_limit = hash_mem * 1024L - partition_mem;
1838  else
1839  *mem_limit = hash_mem * 1024L * 0.75;
1840 
1841  if (*mem_limit > hashentrysize)
1842  *ngroups_limit = *mem_limit / hashentrysize;
1843  else
1844  *ngroups_limit = 1;
1845 }
1846 
1847 /*
1848  * hash_agg_check_limits
1849  *
1850  * After adding a new group to the hash table, check whether we need to enter
1851  * spill mode. Allocations may happen without adding new groups (for instance,
1852  * if the transition state size grows), so this check is imperfect.
1853  */
1854 static void
1856 {
1857  uint64 ngroups = aggstate->hash_ngroups_current;
1858  Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1859  true);
1861  true);
1862 
1863  /*
1864  * Don't spill unless there's at least one group in the hash table so we
1865  * can be sure to make progress even in edge cases.
1866  */
1867  if (aggstate->hash_ngroups_current > 0 &&
1868  (meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
1869  ngroups > aggstate->hash_ngroups_limit))
1870  {
1871  hash_agg_enter_spill_mode(aggstate);
1872  }
1873 }
1874 
1875 /*
1876  * Enter "spill mode", meaning that no new groups are added to any of the hash
1877  * tables. Tuples that would create a new group are instead spilled, and
1878  * processed later.
1879  */
1880 static void
1882 {
1883  aggstate->hash_spill_mode = true;
1884  hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1885 
1886  if (!aggstate->hash_ever_spilled)
1887  {
1888  Assert(aggstate->hash_tapeinfo == NULL);
1889  Assert(aggstate->hash_spills == NULL);
1890 
1891  aggstate->hash_ever_spilled = true;
1892 
1893  hashagg_tapeinfo_init(aggstate);
1894 
1895  aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1896 
1897  for (int setno = 0; setno < aggstate->num_hashes; setno++)
1898  {
1899  AggStatePerHash perhash = &aggstate->perhash[setno];
1900  HashAggSpill *spill = &aggstate->hash_spills[setno];
1901 
1902  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1903  perhash->aggnode->numGroups,
1904  aggstate->hashentrysize);
1905  }
1906  }
1907 }
1908 
1909 /*
1910  * Update metrics after filling the hash table.
1911  *
1912  * If reading from the outer plan, from_tape should be false; if reading from
1913  * another tape, from_tape should be true.
1914  */
1915 static void
1916 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1917 {
1918  Size meta_mem;
1919  Size hashkey_mem;
1920  Size buffer_mem;
1921  Size total_mem;
1922 
1923  if (aggstate->aggstrategy != AGG_MIXED &&
1924  aggstate->aggstrategy != AGG_HASHED)
1925  return;
1926 
1927  /* memory for the hash table itself */
1928  meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1929 
1930  /* memory for the group keys and transition states */
1931  hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1932 
1933  /* memory for read/write tape buffers, if spilled */
1934  buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1935  if (from_tape)
1936  buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1937 
1938  /* update peak mem */
1939  total_mem = meta_mem + hashkey_mem + buffer_mem;
1940  if (total_mem > aggstate->hash_mem_peak)
1941  aggstate->hash_mem_peak = total_mem;
1942 
1943  /* update disk usage */
1944  if (aggstate->hash_tapeinfo != NULL)
1945  {
1946  uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1947 
1948  if (aggstate->hash_disk_used < disk_used)
1949  aggstate->hash_disk_used = disk_used;
1950  }
1951 
1952  /* update hashentrysize estimate based on contents */
1953  if (aggstate->hash_ngroups_current > 0)
1954  {
1955  aggstate->hashentrysize =
1956  sizeof(TupleHashEntryData) +
1957  (hashkey_mem / (double) aggstate->hash_ngroups_current);
1958  }
1959 }
1960 
1961 /*
1962  * Choose a reasonable number of buckets for the initial hash table size.
1963  */
1964 static long
1965 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1966 {
1967  long max_nbuckets;
1968  long nbuckets = ngroups;
1969 
1970  max_nbuckets = memory / hashentrysize;
1971 
1972  /*
1973  * Underestimating is better than overestimating. Too many buckets crowd
1974  * out space for group keys and transition state values.
1975  */
1976  max_nbuckets >>= 1;
1977 
1978  if (nbuckets > max_nbuckets)
1979  nbuckets = max_nbuckets;
1980 
1981  return Max(nbuckets, 1);
1982 }
1983 
1984 /*
1985  * Determine the number of partitions to create when spilling, which will
1986  * always be a power of two. If log2_npartitions is non-NULL, set
1987  * *log2_npartitions to the log2() of the number of partitions.
1988  */
1989 static int
1990 hash_choose_num_partitions(double input_groups, double hashentrysize,
1991  int used_bits, int *log2_npartitions)
1992 {
1993  Size mem_wanted;
1994  int partition_limit;
1995  int npartitions;
1996  int partition_bits;
1997  int hash_mem = get_hash_mem();
1998 
1999  /*
2000  * Avoid creating so many partitions that the memory requirements of the
2001  * open partition files are greater than 1/4 of hash_mem.
2002  */
2003  partition_limit =
2004  (hash_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
2006 
2007  mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
2008 
2009  /* make enough partitions so that each one is likely to fit in memory */
2010  npartitions = 1 + (mem_wanted / (hash_mem * 1024L));
2011 
2012  if (npartitions > partition_limit)
2013  npartitions = partition_limit;
2014 
2015  if (npartitions < HASHAGG_MIN_PARTITIONS)
2016  npartitions = HASHAGG_MIN_PARTITIONS;
2017  if (npartitions > HASHAGG_MAX_PARTITIONS)
2018  npartitions = HASHAGG_MAX_PARTITIONS;
2019 
2020  /* ceil(log2(npartitions)) */
2021  partition_bits = my_log2(npartitions);
2022 
2023  /* make sure that we don't exhaust the hash bits */
2024  if (partition_bits + used_bits >= 32)
2025  partition_bits = 32 - used_bits;
2026 
2027  if (log2_npartitions != NULL)
2028  *log2_npartitions = partition_bits;
2029 
2030  /* number of partitions will be a power of two */
2031  npartitions = 1L << partition_bits;
2032 
2033  return npartitions;
2034 }
2035 
2036 /*
2037  * Initialize a freshly-created TupleHashEntry.
2038  */
2039 static void
2041  TupleHashEntry entry)
2042 {
2043  AggStatePerGroup pergroup;
2044  int transno;
2045 
2046  aggstate->hash_ngroups_current++;
2047  hash_agg_check_limits(aggstate);
2048 
2049  /* no need to allocate or initialize per-group state */
2050  if (aggstate->numtrans == 0)
2051  return;
2052 
2053  pergroup = (AggStatePerGroup)
2054  MemoryContextAlloc(hashtable->tablecxt,
2055  sizeof(AggStatePerGroupData) * aggstate->numtrans);
2056 
2057  entry->additional = pergroup;
2058 
2059  /*
2060  * Initialize aggregates for new tuple group, lookup_hash_entries()
2061  * already has selected the relevant grouping set.
2062  */
2063  for (transno = 0; transno < aggstate->numtrans; transno++)
2064  {
2065  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2066  AggStatePerGroup pergroupstate = &pergroup[transno];
2067 
2068  initialize_aggregate(aggstate, pertrans, pergroupstate);
2069  }
2070 }
2071 
2072 /*
2073  * Look up hash entries for the current tuple in all hashed grouping sets.
2074  *
2075  * Be aware that lookup_hash_entry can reset the tmpcontext.
2076  *
2077  * Some entries may be left NULL if we are in "spill mode". The same tuple
2078  * will belong to different groups for each grouping set, so may match a group
2079  * already in memory for one set and match a group not in memory for another
2080  * set. When in "spill mode", the tuple will be spilled for each grouping set
2081  * where it doesn't match a group in memory.
2082  *
2083  * NB: It's possible to spill the same tuple for several different grouping
2084  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2085  * the tuple multiple times for multiple grouping sets, it can be partitioned
2086  * for each grouping set, making the refilling of the hash table very
2087  * efficient.
2088  */
2089 static void
2091 {
2092  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2093  TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2094  int setno;
2095 
2096  for (setno = 0; setno < aggstate->num_hashes; setno++)
2097  {
2098  AggStatePerHash perhash = &aggstate->perhash[setno];
2099  TupleHashTable hashtable = perhash->hashtable;
2100  TupleTableSlot *hashslot = perhash->hashslot;
2101  TupleHashEntry entry;
2102  uint32 hash;
2103  bool isnew = false;
2104  bool *p_isnew;
2105 
2106  /* if hash table already spilled, don't create new entries */
2107  p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2108 
2109  select_current_set(aggstate, setno, true);
2110  prepare_hash_slot(perhash,
2111  outerslot,
2112  hashslot);
2113 
2114  entry = LookupTupleHashEntry(hashtable, hashslot,
2115  p_isnew, &hash);
2116 
2117  if (entry != NULL)
2118  {
2119  if (isnew)
2120  initialize_hash_entry(aggstate, hashtable, entry);
2121  pergroup[setno] = entry->additional;
2122  }
2123  else
2124  {
2125  HashAggSpill *spill = &aggstate->hash_spills[setno];
2126  TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2127 
2128  if (spill->partitions == NULL)
2129  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2130  perhash->aggnode->numGroups,
2131  aggstate->hashentrysize);
2132 
2133  hashagg_spill_tuple(aggstate, spill, slot, hash);
2134  pergroup[setno] = NULL;
2135  }
2136  }
2137 }
2138 
2139 /*
2140  * ExecAgg -
2141  *
2142  * ExecAgg receives tuples from its outer subplan and aggregates over
2143  * the appropriate attribute for each aggregate function use (Aggref
2144  * node) appearing in the targetlist or qual of the node. The number
2145  * of tuples to aggregate over depends on whether grouped or plain
2146  * aggregation is selected. In grouped aggregation, we produce a result
2147  * row for each group; in plain aggregation there's a single result row
2148  * for the whole query. In either case, the value of each aggregate is
2149  * stored in the expression context to be used when ExecProject evaluates
2150  * the result tuple.
2151  */
2152 static TupleTableSlot *
2154 {
2155  AggState *node = castNode(AggState, pstate);
2156  TupleTableSlot *result = NULL;
2157 
2159 
2160  if (!node->agg_done)
2161  {
2162  /* Dispatch based on strategy */
2163  switch (node->phase->aggstrategy)
2164  {
2165  case AGG_HASHED:
2166  if (!node->table_filled)
2167  agg_fill_hash_table(node);
2168  /* FALLTHROUGH */
2169  case AGG_MIXED:
2170  result = agg_retrieve_hash_table(node);
2171  break;
2172  case AGG_PLAIN:
2173  case AGG_SORTED:
2174  result = agg_retrieve_direct(node);
2175  break;
2176  }
2177 
2178  if (!TupIsNull(result))
2179  return result;
2180  }
2181 
2182  return NULL;
2183 }
2184 
2185 /*
2186  * ExecAgg for non-hashed case
2187  */
2188 static TupleTableSlot *
2190 {
2191  Agg *node = aggstate->phase->aggnode;
2192  ExprContext *econtext;
2193  ExprContext *tmpcontext;
2194  AggStatePerAgg peragg;
2195  AggStatePerGroup *pergroups;
2196  TupleTableSlot *outerslot;
2197  TupleTableSlot *firstSlot;
2198  TupleTableSlot *result;
2199  bool hasGroupingSets = aggstate->phase->numsets > 0;
2200  int numGroupingSets = Max(aggstate->phase->numsets, 1);
2201  int currentSet;
2202  int nextSetSize;
2203  int numReset;
2204  int i;
2205 
2206  /*
2207  * get state info from node
2208  *
2209  * econtext is the per-output-tuple expression context
2210  *
2211  * tmpcontext is the per-input-tuple expression context
2212  */
2213  econtext = aggstate->ss.ps.ps_ExprContext;
2214  tmpcontext = aggstate->tmpcontext;
2215 
2216  peragg = aggstate->peragg;
2217  pergroups = aggstate->pergroups;
2218  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2219 
2220  /*
2221  * We loop retrieving groups until we find one matching
2222  * aggstate->ss.ps.qual
2223  *
2224  * For grouping sets, we have the invariant that aggstate->projected_set
2225  * is either -1 (initial call) or the index (starting from 0) in
2226  * gset_lengths for the group we just completed (either by projecting a
2227  * row or by discarding it in the qual).
2228  */
2229  while (!aggstate->agg_done)
2230  {
2231  /*
2232  * Clear the per-output-tuple context for each group, as well as
2233  * aggcontext (which contains any pass-by-ref transvalues of the old
2234  * group). Some aggregate functions store working state in child
2235  * contexts; those now get reset automatically without us needing to
2236  * do anything special.
2237  *
2238  * We use ReScanExprContext not just ResetExprContext because we want
2239  * any registered shutdown callbacks to be called. That allows
2240  * aggregate functions to ensure they've cleaned up any non-memory
2241  * resources.
2242  */
2243  ReScanExprContext(econtext);
2244 
2245  /*
2246  * Determine how many grouping sets need to be reset at this boundary.
2247  */
2248  if (aggstate->projected_set >= 0 &&
2249  aggstate->projected_set < numGroupingSets)
2250  numReset = aggstate->projected_set + 1;
2251  else
2252  numReset = numGroupingSets;
2253 
2254  /*
2255  * numReset can change on a phase boundary, but that's OK; we want to
2256  * reset the contexts used in _this_ phase, and later, after possibly
2257  * changing phase, initialize the right number of aggregates for the
2258  * _new_ phase.
2259  */
2260 
2261  for (i = 0; i < numReset; i++)
2262  {
2263  ReScanExprContext(aggstate->aggcontexts[i]);
2264  }
2265 
2266  /*
2267  * Check if input is complete and there are no more groups to project
2268  * in this phase; move to next phase or mark as done.
2269  */
2270  if (aggstate->input_done == true &&
2271  aggstate->projected_set >= (numGroupingSets - 1))
2272  {
2273  if (aggstate->current_phase < aggstate->numphases - 1)
2274  {
2275  initialize_phase(aggstate, aggstate->current_phase + 1);
2276  aggstate->input_done = false;
2277  aggstate->projected_set = -1;
2278  numGroupingSets = Max(aggstate->phase->numsets, 1);
2279  node = aggstate->phase->aggnode;
2280  numReset = numGroupingSets;
2281  }
2282  else if (aggstate->aggstrategy == AGG_MIXED)
2283  {
2284  /*
2285  * Mixed mode; we've output all the grouped stuff and have
2286  * full hashtables, so switch to outputting those.
2287  */
2288  initialize_phase(aggstate, 0);
2289  aggstate->table_filled = true;
2291  &aggstate->perhash[0].hashiter);
2292  select_current_set(aggstate, 0, true);
2293  return agg_retrieve_hash_table(aggstate);
2294  }
2295  else
2296  {
2297  aggstate->agg_done = true;
2298  break;
2299  }
2300  }
2301 
2302  /*
2303  * Get the number of columns in the next grouping set after the last
2304  * projected one (if any). This is the number of columns to compare to
2305  * see if we reached the boundary of that set too.
2306  */
2307  if (aggstate->projected_set >= 0 &&
2308  aggstate->projected_set < (numGroupingSets - 1))
2309  nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2310  else
2311  nextSetSize = 0;
2312 
2313  /*----------
2314  * If a subgroup for the current grouping set is present, project it.
2315  *
2316  * We have a new group if:
2317  * - we're out of input but haven't projected all grouping sets
2318  * (checked above)
2319  * OR
2320  * - we already projected a row that wasn't from the last grouping
2321  * set
2322  * AND
2323  * - the next grouping set has at least one grouping column (since
2324  * empty grouping sets project only once input is exhausted)
2325  * AND
2326  * - the previous and pending rows differ on the grouping columns
2327  * of the next grouping set
2328  *----------
2329  */
2330  tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2331  if (aggstate->input_done ||
2332  (node->aggstrategy != AGG_PLAIN &&
2333  aggstate->projected_set != -1 &&
2334  aggstate->projected_set < (numGroupingSets - 1) &&
2335  nextSetSize > 0 &&
2336  !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2337  tmpcontext)))
2338  {
2339  aggstate->projected_set += 1;
2340 
2341  Assert(aggstate->projected_set < numGroupingSets);
2342  Assert(nextSetSize > 0 || aggstate->input_done);
2343  }
2344  else
2345  {
2346  /*
2347  * We no longer care what group we just projected, the next
2348  * projection will always be the first (or only) grouping set
2349  * (unless the input proves to be empty).
2350  */
2351  aggstate->projected_set = 0;
2352 
2353  /*
2354  * If we don't already have the first tuple of the new group,
2355  * fetch it from the outer plan.
2356  */
2357  if (aggstate->grp_firstTuple == NULL)
2358  {
2359  outerslot = fetch_input_tuple(aggstate);
2360  if (!TupIsNull(outerslot))
2361  {
2362  /*
2363  * Make a copy of the first input tuple; we will use this
2364  * for comparisons (in group mode) and for projection.
2365  */
2366  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2367  }
2368  else
2369  {
2370  /* outer plan produced no tuples at all */
2371  if (hasGroupingSets)
2372  {
2373  /*
2374  * If there was no input at all, we need to project
2375  * rows only if there are grouping sets of size 0.
2376  * Note that this implies that there can't be any
2377  * references to ungrouped Vars, which would otherwise
2378  * cause issues with the empty output slot.
2379  *
2380  * XXX: This is no longer true, we currently deal with
2381  * this in finalize_aggregates().
2382  */
2383  aggstate->input_done = true;
2384 
2385  while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2386  {
2387  aggstate->projected_set += 1;
2388  if (aggstate->projected_set >= numGroupingSets)
2389  {
2390  /*
2391  * We can't set agg_done here because we might
2392  * have more phases to do, even though the
2393  * input is empty. So we need to restart the
2394  * whole outer loop.
2395  */
2396  break;
2397  }
2398  }
2399 
2400  if (aggstate->projected_set >= numGroupingSets)
2401  continue;
2402  }
2403  else
2404  {
2405  aggstate->agg_done = true;
2406  /* If we are grouping, we should produce no tuples too */
2407  if (node->aggstrategy != AGG_PLAIN)
2408  return NULL;
2409  }
2410  }
2411  }
2412 
2413  /*
2414  * Initialize working state for a new input tuple group.
2415  */
2416  initialize_aggregates(aggstate, pergroups, numReset);
2417 
2418  if (aggstate->grp_firstTuple != NULL)
2419  {
2420  /*
2421  * Store the copied first input tuple in the tuple table slot
2422  * reserved for it. The tuple will be deleted when it is
2423  * cleared from the slot.
2424  */
2426  firstSlot, true);
2427  aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2428 
2429  /* set up for first advance_aggregates call */
2430  tmpcontext->ecxt_outertuple = firstSlot;
2431 
2432  /*
2433  * Process each outer-plan tuple, and then fetch the next one,
2434  * until we exhaust the outer plan or cross a group boundary.
2435  */
2436  for (;;)
2437  {
2438  /*
2439  * During phase 1 only of a mixed agg, we need to update
2440  * hashtables as well in advance_aggregates.
2441  */
2442  if (aggstate->aggstrategy == AGG_MIXED &&
2443  aggstate->current_phase == 1)
2444  {
2445  lookup_hash_entries(aggstate);
2446  }
2447 
2448  /* Advance the aggregates (or combine functions) */
2449  advance_aggregates(aggstate);
2450 
2451  /* Reset per-input-tuple context after each tuple */
2452  ResetExprContext(tmpcontext);
2453 
2454  outerslot = fetch_input_tuple(aggstate);
2455  if (TupIsNull(outerslot))
2456  {
2457  /* no more outer-plan tuples available */
2458 
2459  /* if we built hash tables, finalize any spills */
2460  if (aggstate->aggstrategy == AGG_MIXED &&
2461  aggstate->current_phase == 1)
2463 
2464  if (hasGroupingSets)
2465  {
2466  aggstate->input_done = true;
2467  break;
2468  }
2469  else
2470  {
2471  aggstate->agg_done = true;
2472  break;
2473  }
2474  }
2475  /* set up for next advance_aggregates call */
2476  tmpcontext->ecxt_outertuple = outerslot;
2477 
2478  /*
2479  * If we are grouping, check whether we've crossed a group
2480  * boundary.
2481  */
2482  if (node->aggstrategy != AGG_PLAIN)
2483  {
2484  tmpcontext->ecxt_innertuple = firstSlot;
2485  if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2486  tmpcontext))
2487  {
2488  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2489  break;
2490  }
2491  }
2492  }
2493  }
2494 
2495  /*
2496  * Use the representative input tuple for any references to
2497  * non-aggregated input columns in aggregate direct args, the node
2498  * qual, and the tlist. (If we are not grouping, and there are no
2499  * input rows at all, we will come here with an empty firstSlot
2500  * ... but if not grouping, there can't be any references to
2501  * non-aggregated input columns, so no problem.)
2502  */
2503  econtext->ecxt_outertuple = firstSlot;
2504  }
2505 
2506  Assert(aggstate->projected_set >= 0);
2507 
2508  currentSet = aggstate->projected_set;
2509 
2510  prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2511 
2512  select_current_set(aggstate, currentSet, false);
2513 
2514  finalize_aggregates(aggstate,
2515  peragg,
2516  pergroups[currentSet]);
2517 
2518  /*
2519  * If there's no row to project right now, we must continue rather
2520  * than returning a null since there might be more groups.
2521  */
2522  result = project_aggregates(aggstate);
2523  if (result)
2524  return result;
2525  }
2526 
2527  /* No more groups */
2528  return NULL;
2529 }
2530 
2531 /*
2532  * ExecAgg for hashed case: read input and build hash table
2533  */
2534 static void
2536 {
2537  TupleTableSlot *outerslot;
2538  ExprContext *tmpcontext = aggstate->tmpcontext;
2539 
2540  /*
2541  * Process each outer-plan tuple, and then fetch the next one, until we
2542  * exhaust the outer plan.
2543  */
2544  for (;;)
2545  {
2546  outerslot = fetch_input_tuple(aggstate);
2547  if (TupIsNull(outerslot))
2548  break;
2549 
2550  /* set up for lookup_hash_entries and advance_aggregates */
2551  tmpcontext->ecxt_outertuple = outerslot;
2552 
2553  /* Find or build hashtable entries */
2554  lookup_hash_entries(aggstate);
2555 
2556  /* Advance the aggregates (or combine functions) */
2557  advance_aggregates(aggstate);
2558 
2559  /*
2560  * Reset per-input-tuple context after each tuple, but note that the
2561  * hash lookups do this too
2562  */
2563  ResetExprContext(aggstate->tmpcontext);
2564  }
2565 
2566  /* finalize spills, if any */
2568 
2569  aggstate->table_filled = true;
2570  /* Initialize to walk the first hash table */
2571  select_current_set(aggstate, 0, true);
2573  &aggstate->perhash[0].hashiter);
2574 }
2575 
2576 /*
2577  * If any data was spilled during hash aggregation, reset the hash table and
2578  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2579  * table will again contain data, ready to be consumed by
2580  * agg_retrieve_hash_table_in_memory().
2581  *
2582  * Should only be called after all in memory hash table entries have been
2583  * finalized and emitted.
2584  *
2585  * Return false when input is exhausted and there's no more work to be done;
2586  * otherwise return true.
2587  */
2588 static bool
2590 {
2591  HashAggBatch *batch;
2592  AggStatePerHash perhash;
2593  HashAggSpill spill;
2594  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2595  bool spill_initialized = false;
2596 
2597  if (aggstate->hash_batches == NIL)
2598  return false;
2599 
2600  batch = linitial(aggstate->hash_batches);
2601  aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
2602 
2603  hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2604  batch->used_bits, &aggstate->hash_mem_limit,
2605  &aggstate->hash_ngroups_limit, NULL);
2606 
2607  /*
2608  * Each batch only processes one grouping set; set the rest to NULL so
2609  * that advance_aggregates() knows to ignore them. We don't touch
2610  * pergroups for sorted grouping sets here, because they will be needed if
2611  * we rescan later. The expressions for sorted grouping sets will not be
2612  * evaluated after we recompile anyway.
2613  */
2614  MemSet(aggstate->hash_pergroup, 0,
2615  sizeof(AggStatePerGroup) * aggstate->num_hashes);
2616 
2617  /* free memory and reset hash tables */
2618  ReScanExprContext(aggstate->hashcontext);
2619  for (int setno = 0; setno < aggstate->num_hashes; setno++)
2620  ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2621 
2622  aggstate->hash_ngroups_current = 0;
2623 
2624  /*
2625  * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2626  * happens in phase 0. So, we switch to phase 1 when processing a batch,
2627  * and back to phase 0 after the batch is done.
2628  */
2629  Assert(aggstate->current_phase == 0);
2630  if (aggstate->phase->aggstrategy == AGG_MIXED)
2631  {
2632  aggstate->current_phase = 1;
2633  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2634  }
2635 
2636  select_current_set(aggstate, batch->setno, true);
2637 
2638  perhash = &aggstate->perhash[aggstate->current_set];
2639 
2640  /*
2641  * Spilled tuples are always read back as MinimalTuples, which may be
2642  * different from the outer plan, so recompile the aggregate expressions.
2643  *
2644  * We still need the NULL check, because we are only processing one
2645  * grouping set at a time and the rest will be NULL.
2646  */
2647  hashagg_recompile_expressions(aggstate, true, true);
2648 
2649  for (;;)
2650  {
2651  TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2652  TupleTableSlot *hashslot = perhash->hashslot;
2653  TupleHashEntry entry;
2654  MinimalTuple tuple;
2655  uint32 hash;
2656  bool isnew = false;
2657  bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2658 
2660 
2661  tuple = hashagg_batch_read(batch, &hash);
2662  if (tuple == NULL)
2663  break;
2664 
2665  ExecStoreMinimalTuple(tuple, spillslot, true);
2666  aggstate->tmpcontext->ecxt_outertuple = spillslot;
2667 
2668  prepare_hash_slot(perhash,
2669  aggstate->tmpcontext->ecxt_outertuple,
2670  hashslot);
2671  entry = LookupTupleHashEntryHash(
2672  perhash->hashtable, hashslot, p_isnew, hash);
2673 
2674  if (entry != NULL)
2675  {
2676  if (isnew)
2677  initialize_hash_entry(aggstate, perhash->hashtable, entry);
2678  aggstate->hash_pergroup[batch->setno] = entry->additional;
2679  advance_aggregates(aggstate);
2680  }
2681  else
2682  {
2683  if (!spill_initialized)
2684  {
2685  /*
2686  * Avoid initializing the spill until we actually need it so
2687  * that we don't assign tapes that will never be used.
2688  */
2689  spill_initialized = true;
2690  hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2691  batch->input_card, aggstate->hashentrysize);
2692  }
2693  /* no memory for a new group, spill */
2694  hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
2695 
2696  aggstate->hash_pergroup[batch->setno] = NULL;
2697  }
2698 
2699  /*
2700  * Reset per-input-tuple context after each tuple, but note that the
2701  * hash lookups do this too
2702  */
2703  ResetExprContext(aggstate->tmpcontext);
2704  }
2705 
2706  hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2707 
2708  /* change back to phase 0 */
2709  aggstate->current_phase = 0;
2710  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2711 
2712  if (spill_initialized)
2713  {
2714  hashagg_spill_finish(aggstate, &spill, batch->setno);
2715  hash_agg_update_metrics(aggstate, true, spill.npartitions);
2716  }
2717  else
2718  hash_agg_update_metrics(aggstate, true, 0);
2719 
2720  aggstate->hash_spill_mode = false;
2721 
2722  /* prepare to walk the first hash table */
2723  select_current_set(aggstate, batch->setno, true);
2724  ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2725  &aggstate->perhash[batch->setno].hashiter);
2726 
2727  pfree(batch);
2728 
2729  return true;
2730 }
2731 
2732 /*
2733  * ExecAgg for hashed case: retrieving groups from hash table
2734  *
2735  * After exhausting in-memory tuples, also try refilling the hash table using
2736  * previously-spilled tuples. Only returns NULL after all in-memory and
2737  * spilled tuples are exhausted.
2738  */
2739 static TupleTableSlot *
2741 {
2742  TupleTableSlot *result = NULL;
2743 
2744  while (result == NULL)
2745  {
2746  result = agg_retrieve_hash_table_in_memory(aggstate);
2747  if (result == NULL)
2748  {
2749  if (!agg_refill_hash_table(aggstate))
2750  {
2751  aggstate->agg_done = true;
2752  break;
2753  }
2754  }
2755  }
2756 
2757  return result;
2758 }
2759 
2760 /*
2761  * Retrieve the groups from the in-memory hash tables without considering any
2762  * spilled tuples.
2763  */
2764 static TupleTableSlot *
2766 {
2767  ExprContext *econtext;
2768  AggStatePerAgg peragg;
2769  AggStatePerGroup pergroup;
2770  TupleHashEntryData *entry;
2771  TupleTableSlot *firstSlot;
2772  TupleTableSlot *result;
2773  AggStatePerHash perhash;
2774 
2775  /*
2776  * get state info from node.
2777  *
2778  * econtext is the per-output-tuple expression context.
2779  */
2780  econtext = aggstate->ss.ps.ps_ExprContext;
2781  peragg = aggstate->peragg;
2782  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2783 
2784  /*
2785  * Note that perhash (and therefore anything accessed through it) can
2786  * change inside the loop, as we change between grouping sets.
2787  */
2788  perhash = &aggstate->perhash[aggstate->current_set];
2789 
2790  /*
2791  * We loop retrieving groups until we find one satisfying
2792  * aggstate->ss.ps.qual
2793  */
2794  for (;;)
2795  {
2796  TupleTableSlot *hashslot = perhash->hashslot;
2797  int i;
2798 
2800 
2801  /*
2802  * Find the next entry in the hash table
2803  */
2804  entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2805  if (entry == NULL)
2806  {
2807  int nextset = aggstate->current_set + 1;
2808 
2809  if (nextset < aggstate->num_hashes)
2810  {
2811  /*
2812  * Switch to next grouping set, reinitialize, and restart the
2813  * loop.
2814  */
2815  select_current_set(aggstate, nextset, true);
2816 
2817  perhash = &aggstate->perhash[aggstate->current_set];
2818 
2819  ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2820 
2821  continue;
2822  }
2823  else
2824  {
2825  return NULL;
2826  }
2827  }
2828 
2829  /*
2830  * Clear the per-output-tuple context for each group
2831  *
2832  * We intentionally don't use ReScanExprContext here; if any aggs have
2833  * registered shutdown callbacks, they mustn't be called yet, since we
2834  * might not be done with that agg.
2835  */
2836  ResetExprContext(econtext);
2837 
2838  /*
2839  * Transform representative tuple back into one with the right
2840  * columns.
2841  */
2842  ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2843  slot_getallattrs(hashslot);
2844 
2845  ExecClearTuple(firstSlot);
2846  memset(firstSlot->tts_isnull, true,
2847  firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2848 
2849  for (i = 0; i < perhash->numhashGrpCols; i++)
2850  {
2851  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2852 
2853  firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2854  firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2855  }
2856  ExecStoreVirtualTuple(firstSlot);
2857 
2858  pergroup = (AggStatePerGroup) entry->additional;
2859 
2860  /*
2861  * Use the representative input tuple for any references to
2862  * non-aggregated input columns in the qual and tlist.
2863  */
2864  econtext->ecxt_outertuple = firstSlot;
2865 
2866  prepare_projection_slot(aggstate,
2867  econtext->ecxt_outertuple,
2868  aggstate->current_set);
2869 
2870  finalize_aggregates(aggstate, peragg, pergroup);
2871 
2872  result = project_aggregates(aggstate);
2873  if (result)
2874  return result;
2875  }
2876 
2877  /* No more groups */
2878  return NULL;
2879 }
2880 
2881 /*
2882  * Initialize HashTapeInfo
2883  */
2884 static void
2886 {
2887  HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2888  int init_tapes = 16; /* expanded dynamically */
2889 
2890  tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
2891  tapeinfo->ntapes = init_tapes;
2892  tapeinfo->nfreetapes = init_tapes;
2893  tapeinfo->freetapes_alloc = init_tapes;
2894  tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2895  for (int i = 0; i < init_tapes; i++)
2896  tapeinfo->freetapes[i] = i;
2897 
2898  aggstate->hash_tapeinfo = tapeinfo;
2899 }
2900 
2901 /*
2902  * Assign unused tapes to spill partitions, extending the tape set if
2903  * necessary.
2904  */
2905 static void
2907  int npartitions)
2908 {
2909  int partidx = 0;
2910 
2911  /* use free tapes if available */
2912  while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2913  partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2914 
2915  if (partidx < npartitions)
2916  {
2917  LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2918 
2919  while (partidx < npartitions)
2920  partitions[partidx++] = tapeinfo->ntapes++;
2921  }
2922 }
2923 
2924 /*
2925  * After a tape has already been written to and then read, this function
2926  * rewinds it for writing and adds it to the free list.
2927  */
2928 static void
2930 {
2931  /* rewinding frees the buffer while not in use */
2932  LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2933  if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2934  {
2935  tapeinfo->freetapes_alloc <<= 1;
2936  tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
2937  tapeinfo->freetapes_alloc * sizeof(int));
2938  }
2939  tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2940 }
2941 
2942 /*
2943  * hashagg_spill_init
2944  *
2945  * Called after we determined that spilling is necessary. Chooses the number
2946  * of partitions to create, and initializes them.
2947  */
2948 static void
2949 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2950  double input_groups, double hashentrysize)
2951 {
2952  int npartitions;
2953  int partition_bits;
2954 
2955  npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2956  used_bits, &partition_bits);
2957 
2958  spill->partitions = palloc0(sizeof(int) * npartitions);
2959  spill->ntuples = palloc0(sizeof(int64) * npartitions);
2960  spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
2961 
2962  hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2963 
2964  spill->tapeset = tapeinfo->tapeset;
2965  spill->shift = 32 - used_bits - partition_bits;
2966  spill->mask = (npartitions - 1) << spill->shift;
2967  spill->npartitions = npartitions;
2968 
2969  for (int i = 0; i < npartitions; i++)
2971 }
2972 
2973 /*
2974  * hashagg_spill_tuple
2975  *
2976  * No room for new groups in the hash table. Save for later in the appropriate
2977  * partition.
2978  */
2979 static Size
2981  TupleTableSlot *inputslot, uint32 hash)
2982 {
2983  LogicalTapeSet *tapeset = spill->tapeset;
2984  TupleTableSlot *spillslot;
2985  int partition;
2986  MinimalTuple tuple;
2987  int tapenum;
2988  int total_written = 0;
2989  bool shouldFree;
2990 
2991  Assert(spill->partitions != NULL);
2992 
2993  /* spill only attributes that we actually need */
2994  if (!aggstate->all_cols_needed)
2995  {
2996  spillslot = aggstate->hash_spill_wslot;
2997  slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
2998  ExecClearTuple(spillslot);
2999  for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
3000  {
3001  if (bms_is_member(i + 1, aggstate->colnos_needed))
3002  {
3003  spillslot->tts_values[i] = inputslot->tts_values[i];
3004  spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
3005  }
3006  else
3007  spillslot->tts_isnull[i] = true;
3008  }
3009  ExecStoreVirtualTuple(spillslot);
3010  }
3011  else
3012  spillslot = inputslot;
3013 
3014  tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
3015 
3016  partition = (hash & spill->mask) >> spill->shift;
3017  spill->ntuples[partition]++;
3018 
3019  /*
3020  * All hash values destined for a given partition have some bits in
3021  * common, which causes bad HLL cardinality estimates. Hash the hash to
3022  * get a more uniform distribution.
3023  */
3024  addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
3025 
3026  tapenum = spill->partitions[partition];
3027 
3028  LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
3029  total_written += sizeof(uint32);
3030 
3031  LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
3032  total_written += tuple->t_len;
3033 
3034  if (shouldFree)
3035  pfree(tuple);
3036 
3037  return total_written;
3038 }
3039 
3040 /*
3041  * hashagg_batch_new
3042  *
3043  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3044  * be done.
3045  */
3046 static HashAggBatch *
3047 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3048  int64 input_tuples, double input_card, int used_bits)
3049 {
3050  HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
3051 
3052  batch->setno = setno;
3053  batch->used_bits = used_bits;
3054  batch->tapeset = tapeset;
3055  batch->input_tapenum = tapenum;
3056  batch->input_tuples = input_tuples;
3057  batch->input_card = input_card;
3058 
3059  return batch;
3060 }
3061 
3062 /*
3063  * read_spilled_tuple
3064  * read the next tuple from a batch's tape. Return NULL if no more.
3065  */
3066 static MinimalTuple
3068 {
3069  LogicalTapeSet *tapeset = batch->tapeset;
3070  int tapenum = batch->input_tapenum;
3071  MinimalTuple tuple;
3072  uint32 t_len;
3073  size_t nread;
3074  uint32 hash;
3075 
3076  nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
3077  if (nread == 0)
3078  return NULL;
3079  if (nread != sizeof(uint32))
3080  ereport(ERROR,
3082  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3083  tapenum, sizeof(uint32), nread)));
3084  if (hashp != NULL)
3085  *hashp = hash;
3086 
3087  nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
3088  if (nread != sizeof(uint32))
3089  ereport(ERROR,
3091  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3092  tapenum, sizeof(uint32), nread)));
3093 
3094  tuple = (MinimalTuple) palloc(t_len);
3095  tuple->t_len = t_len;
3096 
3097  nread = LogicalTapeRead(tapeset, tapenum,
3098  (void *) ((char *) tuple + sizeof(uint32)),
3099  t_len - sizeof(uint32));
3100  if (nread != t_len - sizeof(uint32))
3101  ereport(ERROR,
3103  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3104  tapenum, t_len - sizeof(uint32), nread)));
3105 
3106  return tuple;
3107 }
3108 
3109 /*
3110  * hashagg_finish_initial_spills
3111  *
3112  * After a HashAggBatch has been processed, it may have spilled tuples to
3113  * disk. If so, turn the spilled partitions into new batches that must later
3114  * be executed.
3115  */
3116 static void
3118 {
3119  int setno;
3120  int total_npartitions = 0;
3121 
3122  if (aggstate->hash_spills != NULL)
3123  {
3124  for (setno = 0; setno < aggstate->num_hashes; setno++)
3125  {
3126  HashAggSpill *spill = &aggstate->hash_spills[setno];
3127 
3128  total_npartitions += spill->npartitions;
3129  hashagg_spill_finish(aggstate, spill, setno);
3130  }
3131 
3132  /*
3133  * We're not processing tuples from outer plan any more; only
3134  * processing batches of spilled tuples. The initial spill structures
3135  * are no longer needed.
3136  */
3137  pfree(aggstate->hash_spills);
3138  aggstate->hash_spills = NULL;
3139  }
3140 
3141  hash_agg_update_metrics(aggstate, false, total_npartitions);
3142  aggstate->hash_spill_mode = false;
3143 }
3144 
3145 /*
3146  * hashagg_spill_finish
3147  *
3148  * Transform spill partitions into new batches.
3149  */
3150 static void
3151 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3152 {
3153  int i;
3154  int used_bits = 32 - spill->shift;
3155 
3156  if (spill->npartitions == 0)
3157  return; /* didn't spill */
3158 
3159  for (i = 0; i < spill->npartitions; i++)
3160  {
3162  int tapenum = spill->partitions[i];
3163  HashAggBatch *new_batch;
3164  double cardinality;
3165 
3166  /* if the partition is empty, don't create a new batch of work */
3167  if (spill->ntuples[i] == 0)
3168  continue;
3169 
3170  cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3171  freeHyperLogLog(&spill->hll_card[i]);
3172 
3173  /* rewinding frees the buffer while not in use */
3174  LogicalTapeRewindForRead(tapeset, tapenum,
3176 
3177  new_batch = hashagg_batch_new(tapeset, tapenum, setno,
3178  spill->ntuples[i], cardinality,
3179  used_bits);
3180  aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
3181  aggstate->hash_batches_used++;
3182  }
3183 
3184  pfree(spill->ntuples);
3185  pfree(spill->hll_card);
3186  pfree(spill->partitions);
3187 }
3188 
3189 /*
3190  * Free resources related to a spilled HashAgg.
3191  */
3192 static void
3194 {
3195  ListCell *lc;
3196 
3197  /* free spills from initial pass */
3198  if (aggstate->hash_spills != NULL)
3199  {
3200  int setno;
3201 
3202  for (setno = 0; setno < aggstate->num_hashes; setno++)
3203  {
3204  HashAggSpill *spill = &aggstate->hash_spills[setno];
3205 
3206  pfree(spill->ntuples);
3207  pfree(spill->partitions);
3208  }
3209  pfree(aggstate->hash_spills);
3210  aggstate->hash_spills = NULL;
3211  }
3212 
3213  /* free batches */
3214  foreach(lc, aggstate->hash_batches)
3215  {
3216  HashAggBatch *batch = (HashAggBatch *) lfirst(lc);
3217 
3218  pfree(batch);
3219  }
3220  list_free(aggstate->hash_batches);
3221  aggstate->hash_batches = NIL;
3222 
3223  /* close tape set */
3224  if (aggstate->hash_tapeinfo != NULL)
3225  {
3226  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3227 
3228  LogicalTapeSetClose(tapeinfo->tapeset);
3229  pfree(tapeinfo->freetapes);
3230  pfree(tapeinfo);
3231  aggstate->hash_tapeinfo = NULL;
3232  }
3233 }
3234 
3235 
3236 /* -----------------
3237  * ExecInitAgg
3238  *
3239  * Creates the run-time information for the agg node produced by the
3240  * planner and initializes its outer subtree.
3241  *
3242  * -----------------
3243  */
3244 AggState *
3245 ExecInitAgg(Agg *node, EState *estate, int eflags)
3246 {
3247  AggState *aggstate;
3248  AggStatePerAgg peraggs;
3249  AggStatePerTrans pertransstates;
3250  AggStatePerGroup *pergroups;
3251  Plan *outerPlan;
3252  ExprContext *econtext;
3253  TupleDesc scanDesc;
3254  int max_aggno;
3255  int max_transno;
3256  int numaggrefs;
3257  int numaggs;
3258  int numtrans;
3259  int phase;
3260  int phaseidx;
3261  ListCell *l;
3262  Bitmapset *all_grouped_cols = NULL;
3263  int numGroupingSets = 1;
3264  int numPhases;
3265  int numHashes;
3266  int i = 0;
3267  int j = 0;
3268  bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3269  node->aggstrategy == AGG_MIXED);
3270 
3271  /* check for unsupported flags */
3272  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3273 
3274  /*
3275  * create state structure
3276  */
3277  aggstate = makeNode(AggState);
3278  aggstate->ss.ps.plan = (Plan *) node;
3279  aggstate->ss.ps.state = estate;
3280  aggstate->ss.ps.ExecProcNode = ExecAgg;
3281 
3282  aggstate->aggs = NIL;
3283  aggstate->numaggs = 0;
3284  aggstate->numtrans = 0;
3285  aggstate->aggstrategy = node->aggstrategy;
3286  aggstate->aggsplit = node->aggsplit;
3287  aggstate->maxsets = 0;
3288  aggstate->projected_set = -1;
3289  aggstate->current_set = 0;
3290  aggstate->peragg = NULL;
3291  aggstate->pertrans = NULL;
3292  aggstate->curperagg = NULL;
3293  aggstate->curpertrans = NULL;
3294  aggstate->input_done = false;
3295  aggstate->agg_done = false;
3296  aggstate->pergroups = NULL;
3297  aggstate->grp_firstTuple = NULL;
3298  aggstate->sort_in = NULL;
3299  aggstate->sort_out = NULL;
3300 
3301  /*
3302  * phases[0] always exists, but is dummy in sorted/plain mode
3303  */
3304  numPhases = (use_hashing ? 1 : 2);
3305  numHashes = (use_hashing ? 1 : 0);
3306 
3307  /*
3308  * Calculate the maximum number of grouping sets in any phase; this
3309  * determines the size of some allocations. Also calculate the number of
3310  * phases, since all hashed/mixed nodes contribute to only a single phase.
3311  */
3312  if (node->groupingSets)
3313  {
3314  numGroupingSets = list_length(node->groupingSets);
3315 
3316  foreach(l, node->chain)
3317  {
3318  Agg *agg = lfirst(l);
3319 
3320  numGroupingSets = Max(numGroupingSets,
3321  list_length(agg->groupingSets));
3322 
3323  /*
3324  * additional AGG_HASHED aggs become part of phase 0, but all
3325  * others add an extra phase.
3326  */
3327  if (agg->aggstrategy != AGG_HASHED)
3328  ++numPhases;
3329  else
3330  ++numHashes;
3331  }
3332  }
3333 
3334  aggstate->maxsets = numGroupingSets;
3335  aggstate->numphases = numPhases;
3336 
3337  aggstate->aggcontexts = (ExprContext **)
3338  palloc0(sizeof(ExprContext *) * numGroupingSets);
3339 
3340  /*
3341  * Create expression contexts. We need three or more, one for
3342  * per-input-tuple processing, one for per-output-tuple processing, one
3343  * for all the hashtables, and one for each grouping set. The per-tuple
3344  * memory context of the per-grouping-set ExprContexts (aggcontexts)
3345  * replaces the standalone memory context formerly used to hold transition
3346  * values. We cheat a little by using ExecAssignExprContext() to build
3347  * all of them.
3348  *
3349  * NOTE: the details of what is stored in aggcontexts and what is stored
3350  * in the regular per-query memory context are driven by a simple
3351  * decision: we want to reset the aggcontext at group boundaries (if not
3352  * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3353  */
3354  ExecAssignExprContext(estate, &aggstate->ss.ps);
3355  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3356 
3357  for (i = 0; i < numGroupingSets; ++i)
3358  {
3359  ExecAssignExprContext(estate, &aggstate->ss.ps);
3360  aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3361  }
3362 
3363  if (use_hashing)
3364  aggstate->hashcontext = CreateWorkExprContext(estate);
3365 
3366  ExecAssignExprContext(estate, &aggstate->ss.ps);
3367 
3368  /*
3369  * Initialize child nodes.
3370  *
3371  * If we are doing a hashed aggregation then the child plan does not need
3372  * to handle REWIND efficiently; see ExecReScanAgg.
3373  */
3374  if (node->aggstrategy == AGG_HASHED)
3375  eflags &= ~EXEC_FLAG_REWIND;
3376  outerPlan = outerPlan(node);
3377  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3378 
3379  /*
3380  * initialize source tuple type.
3381  */
3382  aggstate->ss.ps.outerops =
3384  &aggstate->ss.ps.outeropsfixed);
3385  aggstate->ss.ps.outeropsset = true;
3386 
3387  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3388  aggstate->ss.ps.outerops);
3389  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3390 
3391  /*
3392  * If there are more than two phases (including a potential dummy phase
3393  * 0), input will be resorted using tuplesort. Need a slot for that.
3394  */
3395  if (numPhases > 2)
3396  {
3397  aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3399 
3400  /*
3401  * The output of the tuplesort, and the output from the outer child
3402  * might not use the same type of slot. In most cases the child will
3403  * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3404  * input can also be presorted due an index, in which case it could be
3405  * a different type of slot.
3406  *
3407  * XXX: For efficiency it would be good to instead/additionally
3408  * generate expressions with corresponding settings of outerops* for
3409  * the individual phases - deforming is often a bottleneck for
3410  * aggregations with lots of rows per group. If there's multiple
3411  * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3412  * the nodeAgg.c internal tuplesort).
3413  */
3414  if (aggstate->ss.ps.outeropsfixed &&
3415  aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3416  aggstate->ss.ps.outeropsfixed = false;
3417  }
3418 
3419  /*
3420  * Initialize result type, slot and projection.
3421  */
3423  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3424 
3425  /*
3426  * initialize child expressions
3427  *
3428  * We expect the parser to have checked that no aggs contain other agg
3429  * calls in their arguments (and just to be sure, we verify it again while
3430  * initializing the plan node). This would make no sense under SQL
3431  * semantics, and it's forbidden by the spec. Because it is true, we
3432  * don't need to worry about evaluating the aggs in any particular order.
3433  *
3434  * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
3435  * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
3436  * during ExecAssignProjectionInfo, above.
3437  */
3438  aggstate->ss.ps.qual =
3439  ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3440 
3441  /*
3442  * We should now have found all Aggrefs in the targetlist and quals.
3443  */
3444  numaggrefs = list_length(aggstate->aggs);
3445  max_aggno = -1;
3446  max_transno = -1;
3447  foreach(l, aggstate->aggs)
3448  {
3449  Aggref *aggref = (Aggref *) lfirst(l);
3450 
3451  max_aggno = Max(max_aggno, aggref->aggno);
3452  max_transno = Max(max_transno, aggref->aggtransno);
3453  }
3454  numaggs = max_aggno + 1;
3455  numtrans = max_transno + 1;
3456 
3457  /*
3458  * For each phase, prepare grouping set data and fmgr lookup data for
3459  * compare functions. Accumulate all_grouped_cols in passing.
3460  */
3461  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3462 
3463  aggstate->num_hashes = numHashes;
3464  if (numHashes)
3465  {
3466  aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3467  aggstate->phases[0].numsets = 0;
3468  aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3469  aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3470  }
3471 
3472  phase = 0;
3473  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3474  {
3475  Agg *aggnode;
3476  Sort *sortnode;
3477 
3478  if (phaseidx > 0)
3479  {
3480  aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3481  sortnode = castNode(Sort, aggnode->plan.lefttree);
3482  }
3483  else
3484  {
3485  aggnode = node;
3486  sortnode = NULL;
3487  }
3488 
3489  Assert(phase <= 1 || sortnode);
3490 
3491  if (aggnode->aggstrategy == AGG_HASHED
3492  || aggnode->aggstrategy == AGG_MIXED)
3493  {
3494  AggStatePerPhase phasedata = &aggstate->phases[0];
3495  AggStatePerHash perhash;
3496  Bitmapset *cols = NULL;
3497 
3498  Assert(phase == 0);
3499  i = phasedata->numsets++;
3500  perhash = &aggstate->perhash[i];
3501 
3502  /* phase 0 always points to the "real" Agg in the hash case */
3503  phasedata->aggnode = node;
3504  phasedata->aggstrategy = node->aggstrategy;
3505 
3506  /* but the actual Agg node representing this hash is saved here */
3507  perhash->aggnode = aggnode;
3508 
3509  phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3510 
3511  for (j = 0; j < aggnode->numCols; ++j)
3512  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3513 
3514  phasedata->grouped_cols[i] = cols;
3515 
3516  all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3517  continue;
3518  }
3519  else
3520  {
3521  AggStatePerPhase phasedata = &aggstate->phases[++phase];
3522  int num_sets;
3523 
3524  phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3525 
3526  if (num_sets)
3527  {
3528  phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3529  phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3530 
3531  i = 0;
3532  foreach(l, aggnode->groupingSets)
3533  {
3534  int current_length = list_length(lfirst(l));
3535  Bitmapset *cols = NULL;
3536 
3537  /* planner forces this to be correct */
3538  for (j = 0; j < current_length; ++j)
3539  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3540 
3541  phasedata->grouped_cols[i] = cols;
3542  phasedata->gset_lengths[i] = current_length;
3543 
3544  ++i;
3545  }
3546 
3547  all_grouped_cols = bms_add_members(all_grouped_cols,
3548  phasedata->grouped_cols[0]);
3549  }
3550  else
3551  {
3552  Assert(phaseidx == 0);
3553 
3554  phasedata->gset_lengths = NULL;
3555  phasedata->grouped_cols = NULL;
3556  }
3557 
3558  /*
3559  * If we are grouping, precompute fmgr lookup data for inner loop.
3560  */
3561  if (aggnode->aggstrategy == AGG_SORTED)
3562  {
3563  int i = 0;
3564 
3565  Assert(aggnode->numCols > 0);
3566 
3567  /*
3568  * Build a separate function for each subset of columns that
3569  * need to be compared.
3570  */
3571  phasedata->eqfunctions =
3572  (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3573 
3574  /* for each grouping set */
3575  for (i = 0; i < phasedata->numsets; i++)
3576  {
3577  int length = phasedata->gset_lengths[i];
3578 
3579  if (phasedata->eqfunctions[length - 1] != NULL)
3580  continue;
3581 
3582  phasedata->eqfunctions[length - 1] =
3583  execTuplesMatchPrepare(scanDesc,
3584  length,
3585  aggnode->grpColIdx,
3586  aggnode->grpOperators,
3587  aggnode->grpCollations,
3588  (PlanState *) aggstate);
3589  }
3590 
3591  /* and for all grouped columns, unless already computed */
3592  if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3593  {
3594  phasedata->eqfunctions[aggnode->numCols - 1] =
3595  execTuplesMatchPrepare(scanDesc,
3596  aggnode->numCols,
3597  aggnode->grpColIdx,
3598  aggnode->grpOperators,
3599  aggnode->grpCollations,
3600  (PlanState *) aggstate);
3601  }
3602  }
3603 
3604  phasedata->aggnode = aggnode;
3605  phasedata->aggstrategy = aggnode->aggstrategy;
3606  phasedata->sortnode = sortnode;
3607  }
3608  }
3609 
3610  /*
3611  * Convert all_grouped_cols to a descending-order list.
3612  */
3613  i = -1;
3614  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3615  aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3616 
3617  /*
3618  * Set up aggregate-result storage in the output expr context, and also
3619  * allocate my private per-agg working storage
3620  */
3621  econtext = aggstate->ss.ps.ps_ExprContext;
3622  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3623  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3624 
3625  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3626  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
3627 
3628  aggstate->peragg = peraggs;
3629  aggstate->pertrans = pertransstates;
3630 
3631 
3632  aggstate->all_pergroups =
3634  * (numGroupingSets + numHashes));
3635  pergroups = aggstate->all_pergroups;
3636 
3637  if (node->aggstrategy != AGG_HASHED)
3638  {
3639  for (i = 0; i < numGroupingSets; i++)
3640  {
3641  pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3642  * numaggs);
3643  }
3644 
3645  aggstate->pergroups = pergroups;
3646  pergroups += numGroupingSets;
3647  }
3648 
3649  /*
3650  * Hashing can only appear in the initial phase.
3651  */
3652  if (use_hashing)
3653  {
3654  Plan *outerplan = outerPlan(node);
3655  uint64 totalGroups = 0;
3656  int i;
3657 
3658  aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3659  "HashAgg meta context",
3661  aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3663  aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3664  &TTSOpsVirtual);
3665 
3666  /* this is an array of pointers, not structures */
3667  aggstate->hash_pergroup = pergroups;
3668 
3669  aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3670  outerplan->plan_width,
3671  node->transitionSpace);
3672 
3673  /*
3674  * Consider all of the grouping sets together when setting the limits
3675  * and estimating the number of partitions. This can be inaccurate
3676  * when there is more than one grouping set, but should still be
3677  * reasonable.
3678  */
3679  for (i = 0; i < aggstate->num_hashes; i++)
3680  totalGroups += aggstate->perhash[i].aggnode->numGroups;
3681 
3682  hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3683  &aggstate->hash_mem_limit,
3684  &aggstate->hash_ngroups_limit,
3685  &aggstate->hash_planned_partitions);
3686  find_hash_columns(aggstate);
3687 
3688  /* Skip massive memory allocation if we are just doing EXPLAIN */
3689  if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3690  build_hash_tables(aggstate);
3691 
3692  aggstate->table_filled = false;
3693 
3694  /* Initialize this to 1, meaning nothing spilled, yet */
3695  aggstate->hash_batches_used = 1;
3696  }
3697 
3698  /*
3699  * Initialize current phase-dependent values to initial phase. The initial
3700  * phase is 1 (first sort pass) for all strategies that use sorting (if
3701  * hashing is being done too, then phase 0 is processed last); but if only
3702  * hashing is being done, then phase 0 is all there is.
3703  */
3704  if (node->aggstrategy == AGG_HASHED)
3705  {
3706  aggstate->current_phase = 0;
3707  initialize_phase(aggstate, 0);
3708  select_current_set(aggstate, 0, true);
3709  }
3710  else
3711  {
3712  aggstate->current_phase = 1;
3713  initialize_phase(aggstate, 1);
3714  select_current_set(aggstate, 0, false);
3715  }
3716 
3717  /*
3718  * Perform lookups of aggregate function info, and initialize the
3719  * unchanging fields of the per-agg and per-trans data.
3720  */
3721  foreach(l, aggstate->aggs)
3722  {
3723  Aggref *aggref = lfirst(l);
3724  AggStatePerAgg peragg;
3725  AggStatePerTrans pertrans;
3726  Oid inputTypes[FUNC_MAX_ARGS];
3727  int numArguments;
3728  int numDirectArgs;
3729  HeapTuple aggTuple;
3730  Form_pg_aggregate aggform;
3731  AclResult aclresult;
3732  Oid finalfn_oid;
3733  Oid serialfn_oid,
3734  deserialfn_oid;
3735  Oid aggOwner;
3736  Expr *finalfnexpr;
3737  Oid aggtranstype;
3738 
3739  /* Planner should have assigned aggregate to correct level */
3740  Assert(aggref->agglevelsup == 0);
3741  /* ... and the split mode should match */
3742  Assert(aggref->aggsplit == aggstate->aggsplit);
3743 
3744  peragg = &peraggs[aggref->aggno];
3745 
3746  /* Check if we initialized the state for this aggregate already. */
3747  if (peragg->aggref != NULL)
3748  continue;
3749 
3750  peragg->aggref = aggref;
3751  peragg->transno = aggref->aggtransno;
3752 
3753  /* Fetch the pg_aggregate row */
3754  aggTuple = SearchSysCache1(AGGFNOID,
3755  ObjectIdGetDatum(aggref->aggfnoid));
3756  if (!HeapTupleIsValid(aggTuple))
3757  elog(ERROR, "cache lookup failed for aggregate %u",
3758  aggref->aggfnoid);
3759  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3760 
3761  /* Check permission to call aggregate function */
3762  aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3763  ACL_EXECUTE);
3764  if (aclresult != ACLCHECK_OK)
3765  aclcheck_error(aclresult, OBJECT_AGGREGATE,
3766  get_func_name(aggref->aggfnoid));
3768 
3769  /* planner recorded transition state type in the Aggref itself */
3770  aggtranstype = aggref->aggtranstype;
3771  Assert(OidIsValid(aggtranstype));
3772 
3773  /* Final function only required if we're finalizing the aggregates */
3774  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3775  peragg->finalfn_oid = finalfn_oid = InvalidOid;
3776  else
3777  peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3778 
3779  serialfn_oid = InvalidOid;
3780  deserialfn_oid = InvalidOid;
3781 
3782  /*
3783  * Check if serialization/deserialization is required. We only do it
3784  * for aggregates that have transtype INTERNAL.
3785  */
3786  if (aggtranstype == INTERNALOID)
3787  {
3788  /*
3789  * The planner should only have generated a serialize agg node if
3790  * every aggregate with an INTERNAL state has a serialization
3791  * function. Verify that.
3792  */
3793  if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3794  {
3795  /* serialization only valid when not running finalfn */
3796  Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3797 
3798  if (!OidIsValid(aggform->aggserialfn))
3799  elog(ERROR, "serialfunc not provided for serialization aggregation");
3800  serialfn_oid = aggform->aggserialfn;
3801  }
3802 
3803  /* Likewise for deserialization functions */
3804  if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3805  {
3806  /* deserialization only valid when combining states */
3807  Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3808 
3809  if (!OidIsValid(aggform->aggdeserialfn))
3810  elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3811  deserialfn_oid = aggform->aggdeserialfn;
3812  }
3813  }
3814 
3815  /* Check that aggregate owner has permission to call component fns */
3816  {
3817  HeapTuple procTuple;
3818 
3819  procTuple = SearchSysCache1(PROCOID,
3820  ObjectIdGetDatum(aggref->aggfnoid));
3821  if (!HeapTupleIsValid(procTuple))
3822  elog(ERROR, "cache lookup failed for function %u",
3823  aggref->aggfnoid);
3824  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3825  ReleaseSysCache(procTuple);
3826 
3827  if (OidIsValid(finalfn_oid))
3828  {
3829  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3830  ACL_EXECUTE);
3831  if (aclresult != ACLCHECK_OK)
3832  aclcheck_error(aclresult, OBJECT_FUNCTION,
3833  get_func_name(finalfn_oid));
3834  InvokeFunctionExecuteHook(finalfn_oid);
3835  }
3836  if (OidIsValid(serialfn_oid))
3837  {
3838  aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3839  ACL_EXECUTE);
3840  if (aclresult != ACLCHECK_OK)
3841  aclcheck_error(aclresult, OBJECT_FUNCTION,
3842  get_func_name(serialfn_oid));
3843  InvokeFunctionExecuteHook(serialfn_oid);
3844  }
3845  if (OidIsValid(deserialfn_oid))
3846  {
3847  aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3848  ACL_EXECUTE);
3849  if (aclresult != ACLCHECK_OK)
3850  aclcheck_error(aclresult, OBJECT_FUNCTION,
3851  get_func_name(deserialfn_oid));
3852  InvokeFunctionExecuteHook(deserialfn_oid);
3853  }
3854  }
3855 
3856  /*
3857  * Get actual datatypes of the (nominal) aggregate inputs. These
3858  * could be different from the agg's declared input types, when the
3859  * agg accepts ANY or a polymorphic type.
3860  */
3861  numArguments = get_aggregate_argtypes(aggref, inputTypes);
3862 
3863  /* Count the "direct" arguments, if any */
3864  numDirectArgs = list_length(aggref->aggdirectargs);
3865 
3866  /* Detect how many arguments to pass to the finalfn */
3867  if (aggform->aggfinalextra)
3868  peragg->numFinalArgs = numArguments + 1;
3869  else
3870  peragg->numFinalArgs = numDirectArgs + 1;
3871 
3872  /* Initialize any direct-argument expressions */
3873  peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3874  (PlanState *) aggstate);
3875 
3876  /*
3877  * build expression trees using actual argument & result types for the
3878  * finalfn, if it exists and is required.
3879  */
3880  if (OidIsValid(finalfn_oid))
3881  {
3882  build_aggregate_finalfn_expr(inputTypes,
3883  peragg->numFinalArgs,
3884  aggtranstype,
3885  aggref->aggtype,
3886  aggref->inputcollid,
3887  finalfn_oid,
3888  &finalfnexpr);
3889  fmgr_info(finalfn_oid, &peragg->finalfn);
3890  fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3891  }
3892 
3893  /* get info about the output value's datatype */
3894  get_typlenbyval(aggref->aggtype,
3895  &peragg->resulttypeLen,
3896  &peragg->resulttypeByVal);
3897 
3898  /*
3899  * Build working state for invoking the transition function, if we
3900  * haven't done it already.
3901  */
3902  pertrans = &pertransstates[aggref->aggtransno];
3903  if (pertrans->aggref == NULL)
3904  {
3905  Datum textInitVal;
3906  Datum initValue;
3907  bool initValueIsNull;
3908  Oid transfn_oid;
3909 
3910  /*
3911  * If this aggregation is performing state combines, then instead
3912  * of using the transition function, we'll use the combine
3913  * function
3914  */
3915  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3916  {
3917  transfn_oid = aggform->aggcombinefn;
3918 
3919  /* If not set then the planner messed up */
3920  if (!OidIsValid(transfn_oid))
3921  elog(ERROR, "combinefn not set for aggregate function");
3922  }
3923  else
3924  transfn_oid = aggform->aggtransfn;
3925 
3926  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3927  ACL_EXECUTE);
3928  if (aclresult != ACLCHECK_OK)
3929  aclcheck_error(aclresult, OBJECT_FUNCTION,
3930  get_func_name(transfn_oid));
3931  InvokeFunctionExecuteHook(transfn_oid);
3932 
3933  /*
3934  * initval is potentially null, so don't try to access it as a
3935  * struct field. Must do it the hard way with SysCacheGetAttr.
3936  */
3937  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3938  Anum_pg_aggregate_agginitval,
3939  &initValueIsNull);
3940  if (initValueIsNull)
3941  initValue = (Datum) 0;
3942  else
3943  initValue = GetAggInitVal(textInitVal, aggtranstype);
3944 
3945  build_pertrans_for_aggref(pertrans, aggstate, estate,
3946  aggref, transfn_oid, aggtranstype,
3947  serialfn_oid, deserialfn_oid,
3948  initValue, initValueIsNull,
3949  inputTypes, numArguments);
3950  }
3951  else
3952  pertrans->aggshared = true;
3953  ReleaseSysCache(aggTuple);
3954  }
3955 
3956  /*
3957  * Update aggstate->numaggs to be the number of unique aggregates found.
3958  * Also set numstates to the number of unique transition states found.
3959  */
3960  aggstate->numaggs = numaggs;
3961  aggstate->numtrans = numtrans;
3962 
3963  /*
3964  * Last, check whether any more aggregates got added onto the node while
3965  * we processed the expressions for the aggregate arguments (including not
3966  * only the regular arguments and FILTER expressions handled immediately
3967  * above, but any direct arguments we might've handled earlier). If so,
3968  * we have nested aggregate functions, which is semantically nonsensical,
3969  * so complain. (This should have been caught by the parser, so we don't
3970  * need to work hard on a helpful error message; but we defend against it
3971  * here anyway, just to be sure.)
3972  */
3973  if (numaggrefs != list_length(aggstate->aggs))
3974  ereport(ERROR,
3975  (errcode(ERRCODE_GROUPING_ERROR),
3976  errmsg("aggregate function calls cannot be nested")));
3977 
3978  /*
3979  * Build expressions doing all the transition work at once. We build a
3980  * different one for each phase, as the number of transition function
3981  * invocation can differ between phases. Note this'll work both for
3982  * transition and combination functions (although there'll only be one
3983  * phase in the latter case).
3984  */
3985  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3986  {
3987  AggStatePerPhase phase = &aggstate->phases[phaseidx];
3988  bool dohash = false;
3989  bool dosort = false;
3990 
3991  /* phase 0 doesn't necessarily exist */
3992  if (!phase->aggnode)
3993  continue;
3994 
3995  if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3996  {
3997  /*
3998  * Phase one, and only phase one, in a mixed agg performs both
3999  * sorting and aggregation.
4000  */
4001  dohash = true;
4002  dosort = true;
4003  }
4004  else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4005  {
4006  /*
4007  * No need to compute a transition function for an AGG_MIXED phase
4008  * 0 - the contents of the hashtables will have been computed
4009  * during phase 1.
4010  */
4011  continue;
4012  }
4013  else if (phase->aggstrategy == AGG_PLAIN ||
4014  phase->aggstrategy == AGG_SORTED)
4015  {
4016  dohash = false;
4017  dosort = true;
4018  }
4019  else if (phase->aggstrategy == AGG_HASHED)
4020  {
4021  dohash = true;
4022  dosort = false;
4023  }
4024  else
4025  Assert(false);
4026 
4027  phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4028  false);
4029 
4030  /* cache compiled expression for outer slot without NULL check */
4031  phase->evaltrans_cache[0][0] = phase->evaltrans;
4032  }
4033 
4034  return aggstate;
4035 }
4036 
4037 /*
4038  * Build the state needed to calculate a state value for an aggregate.
4039  *
4040  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4041  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
4042  * of the arguments could be calculated from 'aggref', but the caller has
4043  * calculated them already, so might as well pass them.
4044  */
4045 static void
4047  AggState *aggstate, EState *estate,
4048  Aggref *aggref,
4049  Oid aggtransfn, Oid aggtranstype,
4050  Oid aggserialfn, Oid aggdeserialfn,
4051  Datum initValue, bool initValueIsNull,
4052  Oid *inputTypes, int numArguments)
4053 {
4054  int numGroupingSets = Max(aggstate->maxsets, 1);
4055  Expr *serialfnexpr = NULL;
4056  Expr *deserialfnexpr = NULL;
4057  ListCell *lc;
4058  int numInputs;
4059  int numDirectArgs;
4060  List *sortlist;
4061  int numSortCols;
4062  int numDistinctCols;
4063  int i;
4064 
4065  /* Begin filling in the pertrans data */
4066  pertrans->aggref = aggref;
4067  pertrans->aggshared = false;
4068  pertrans->aggCollation = aggref->inputcollid;
4069  pertrans->transfn_oid = aggtransfn;
4070  pertrans->serialfn_oid = aggserialfn;
4071  pertrans->deserialfn_oid = aggdeserialfn;
4072  pertrans->initValue = initValue;
4073  pertrans->initValueIsNull = initValueIsNull;
4074 
4075  /* Count the "direct" arguments, if any */
4076  numDirectArgs = list_length(aggref->aggdirectargs);
4077 
4078  /* Count the number of aggregated input columns */
4079  pertrans->numInputs = numInputs = list_length(aggref->args);
4080 
4081  pertrans->aggtranstype = aggtranstype;
4082 
4083  /*
4084  * When combining states, we have no use at all for the aggregate
4085  * function's transfn. Instead we use the combinefn. In this case, the
4086  * transfn and transfn_oid fields of pertrans refer to the combine
4087  * function rather than the transition function.
4088  */
4089  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4090  {
4091  Expr *combinefnexpr;
4092  size_t numTransArgs;
4093 
4094  /*
4095  * When combining there's only one input, the to-be-combined added
4096  * transition value from below (this node's transition value is
4097  * counted separately).
4098  */
4099  pertrans->numTransInputs = 1;
4100 
4101  /* account for the current transition state */
4102  numTransArgs = pertrans->numTransInputs + 1;
4103 
4104  build_aggregate_combinefn_expr(aggtranstype,
4105  aggref->inputcollid,
4106  aggtransfn,
4107  &combinefnexpr);
4108  fmgr_info(aggtransfn, &pertrans->transfn);
4109  fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4110 
4111  pertrans->transfn_fcinfo =
4114  &pertrans->transfn,
4115  numTransArgs,
4116  pertrans->aggCollation,
4117  (void *) aggstate, NULL);
4118 
4119  /*
4120  * Ensure that a combine function to combine INTERNAL states is not
4121  * strict. This should have been checked during CREATE AGGREGATE, but
4122  * the strict property could have been changed since then.
4123  */
4124  if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4125  ereport(ERROR,
4126  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4127  errmsg("combine function with transition type %s must not be declared STRICT",
4128  format_type_be(aggtranstype))));
4129  }
4130  else
4131  {
4132  Expr *transfnexpr;
4133  size_t numTransArgs;
4134 
4135  /* Detect how many arguments to pass to the transfn */
4136  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4137  pertrans->numTransInputs = numInputs;
4138  else
4139  pertrans->numTransInputs = numArguments;
4140 
4141  /* account for the current transition state */
4142  numTransArgs = pertrans->numTransInputs + 1;
4143 
4144  /*
4145  * Set up infrastructure for calling the transfn. Note that
4146  * invtransfn is not needed here.
4147  */
4148  build_aggregate_transfn_expr(inputTypes,
4149  numArguments,
4150  numDirectArgs,
4151  aggref->aggvariadic,
4152  aggtranstype,
4153  aggref->inputcollid,
4154  aggtransfn,
4155  InvalidOid,
4156  &transfnexpr,
4157  NULL);
4158  fmgr_info(aggtransfn, &pertrans->transfn);
4159  fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4160 
4161  pertrans->transfn_fcinfo =
4164  &pertrans->transfn,
4165  numTransArgs,
4166  pertrans->aggCollation,
4167  (void *) aggstate, NULL);
4168 
4169  /*
4170  * If the transfn is strict and the initval is NULL, make sure input
4171  * type and transtype are the same (or at least binary-compatible), so
4172  * that it's OK to use the first aggregated input value as the initial
4173  * transValue. This should have been checked at agg definition time,
4174  * but we must check again in case the transfn's strictness property
4175  * has been changed.
4176  */
4177  if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4178  {
4179  if (numArguments <= numDirectArgs ||
4180  !IsBinaryCoercible(inputTypes[numDirectArgs],
4181  aggtranstype))
4182  ereport(ERROR,
4183  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4184  errmsg("aggregate %u needs to have compatible input type and transition type",
4185  aggref->aggfnoid)));
4186  }
4187  }
4188 
4189  /* get info about the state value's datatype */
4190  get_typlenbyval(aggtranstype,
4191  &pertrans->transtypeLen,
4192  &pertrans->transtypeByVal);
4193 
4194  if (OidIsValid(aggserialfn))
4195  {
4196  build_aggregate_serialfn_expr(aggserialfn,
4197  &serialfnexpr);
4198  fmgr_info(aggserialfn, &pertrans->serialfn);
4199  fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4200 
4201  pertrans->serialfn_fcinfo =
4204  &pertrans->serialfn,
4205  1,
4206  InvalidOid,
4207  (void *) aggstate, NULL);
4208  }
4209 
4210  if (OidIsValid(aggdeserialfn))
4211  {
4212  build_aggregate_deserialfn_expr(aggdeserialfn,
4213  &deserialfnexpr);
4214  fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4215  fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4216 
4217  pertrans->deserialfn_fcinfo =
4220  &pertrans->deserialfn,
4221  2,
4222  InvalidOid,
4223  (void *) aggstate, NULL);
4224 
4225  }
4226 
4227  /*
4228  * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4229  * have a list of SortGroupClause nodes; fish out the data in them and
4230  * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4231  * however; the agg's transfn and finalfn are responsible for that.
4232  *
4233  * Note that by construction, if there is a DISTINCT clause then the ORDER
4234  * BY clause is a prefix of it (see transformDistinctClause).
4235  */
4236  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4237  {
4238  sortlist = NIL;
4239  numSortCols = numDistinctCols = 0;
4240  }
4241  else if (aggref->aggdistinct)
4242  {
4243  sortlist = aggref->aggdistinct;
4244  numSortCols = numDistinctCols = list_length(sortlist);
4245  Assert(numSortCols >= list_length(aggref->aggorder));
4246  }
4247  else
4248  {
4249  sortlist = aggref->aggorder;
4250  numSortCols = list_length(sortlist);
4251  numDistinctCols = 0;
4252  }
4253 
4254  pertrans->numSortCols = numSortCols;
4255  pertrans->numDistinctCols = numDistinctCols;
4256 
4257  /*
4258  * If we have either sorting or filtering to do, create a tupledesc and
4259  * slot corresponding to the aggregated inputs (including sort
4260  * expressions) of the agg.
4261  */
4262  if (numSortCols > 0 || aggref->aggfilter)
4263  {
4264  pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4265  pertrans->sortslot =
4266  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4268  }
4269 
4270  if (numSortCols > 0)
4271  {
4272  /*
4273  * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4274  * (yet)
4275  */
4276  Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4277 
4278  /* If we have only one input, we need its len/byval info. */
4279  if (numInputs == 1)
4280  {
4281  get_typlenbyval(inputTypes[numDirectArgs],
4282  &pertrans->inputtypeLen,
4283  &pertrans->inputtypeByVal);
4284  }
4285  else if (numDistinctCols > 0)
4286  {
4287  /* we will need an extra slot to store prior values */
4288  pertrans->uniqslot =
4289  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4291  }
4292 
4293  /* Extract the sort information for use later */
4294  pertrans->sortColIdx =
4295  (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4296  pertrans->sortOperators =
4297  (Oid *) palloc(numSortCols * sizeof(Oid));
4298  pertrans->sortCollations =
4299  (Oid *) palloc(numSortCols * sizeof(Oid));
4300  pertrans->sortNullsFirst =
4301  (bool *) palloc(numSortCols * sizeof(bool));
4302 
4303  i = 0;
4304  foreach(lc, sortlist)
4305  {
4306  SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4307  TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4308 
4309  /* the parser should have made sure of this */
4310  Assert(OidIsValid(sortcl->sortop));
4311 
4312  pertrans->sortColIdx[i] = tle->resno;
4313  pertrans->sortOperators[i] = sortcl->sortop;
4314  pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4315  pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4316  i++;
4317  }
4318  Assert(i == numSortCols);
4319  }
4320 
4321  if (aggref->aggdistinct)
4322  {
4323  Oid *ops;
4324 
4325  Assert(numArguments > 0);
4326  Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4327 
4328  ops = palloc(numDistinctCols * sizeof(Oid));
4329 
4330  i = 0;
4331  foreach(lc, aggref->aggdistinct)
4332  ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4333 
4334  /* lookup / build the necessary comparators */
4335  if (numDistinctCols == 1)
4336  fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4337  else
4338  pertrans->equalfnMulti =
4339  execTuplesMatchPrepare(pertrans->sortdesc,
4340  numDistinctCols,
4341  pertrans->sortColIdx,
4342  ops,
4343  pertrans->sortCollations,
4344  &aggstate->ss.ps);
4345  pfree(ops);
4346  }
4347 
4348  pertrans->sortstates = (Tuplesortstate **)
4349  palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4350 }
4351 
4352 
4353 static Datum
4354 GetAggInitVal(Datum textInitVal, Oid transtype)
4355 {
4356  Oid typinput,
4357  typioparam;
4358  char *strInitVal;
4359  Datum initVal;
4360 
4361  getTypeInputInfo(transtype, &typinput, &typioparam);
4362  strInitVal = TextDatumGetCString(textInitVal);
4363  initVal = OidInputFunctionCall(typinput, strInitVal,
4364  typioparam, -1);
4365  pfree(strInitVal);
4366  return initVal;
4367 }
4368 
4369 void
4371 {
4373  int transno;
4374  int numGroupingSets = Max(node->maxsets, 1);
4375  int setno;
4376 
4377  /*
4378  * When ending a parallel worker, copy the statistics gathered by the
4379  * worker back into shared memory so that it can be picked up by the main
4380  * process to report in EXPLAIN ANALYZE.
4381  */
4382  if (node->shared_info && IsParallelWorker())
4383  {
4385 
4386  Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4389  si->hash_disk_used = node->hash_disk_used;
4390  si->hash_mem_peak = node->hash_mem_peak;
4391  }
4392 
4393  /* Make sure we have closed any open tuplesorts */
4394 
4395  if (node->sort_in)
4396  tuplesort_end(node->sort_in);
4397  if (node->sort_out)
4398  tuplesort_end(node->sort_out);
4399 
4401 
4402  if (node->hash_metacxt != NULL)
4403  {
4405  node->hash_metacxt = NULL;
4406  }
4407 
4408  for (transno = 0; transno < node->numtrans; transno++)
4409  {
4410  AggStatePerTrans pertrans = &node->pertrans[transno];
4411 
4412  for (setno = 0; setno < numGroupingSets; setno++)
4413  {
4414  if (pertrans->sortstates[setno])
4415  tuplesort_end(pertrans->sortstates[setno]);
4416  }
4417  }
4418 
4419  /* And ensure any agg shutdown callbacks have been called */
4420  for (setno = 0; setno < numGroupingSets; setno++)
4421  ReScanExprContext(node->aggcontexts[setno]);
4422  if (node->hashcontext)
4424 
4425  /*
4426  * We don't actually free any ExprContexts here (see comment in
4427  * ExecFreeExprContext), just unlinking the output one from the plan node
4428  * suffices.
4429  */
4430  ExecFreeExprContext(&node->ss.ps);
4431 
4432  /* clean up tuple table */
4434 
4435  outerPlan = outerPlanState(node);
4436  ExecEndNode(outerPlan);
4437 }
4438 
4439 void
4441 {
4442  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4444  Agg *aggnode = (Agg *) node->ss.ps.plan;
4445  int transno;
4446  int numGroupingSets = Max(node->maxsets, 1);
4447  int setno;
4448 
4449  node->agg_done = false;
4450 
4451  if (node->aggstrategy == AGG_HASHED)
4452  {
4453  /*
4454  * In the hashed case, if we haven't yet built the hash table then we
4455  * can just return; nothing done yet, so nothing to undo. If subnode's
4456  * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4457  * else no reason to re-scan it at all.
4458  */
4459  if (!node->table_filled)
4460  return;
4461 
4462  /*
4463  * If we do have the hash table, and it never spilled, and the subplan
4464  * does not have any parameter changes, and none of our own parameter
4465  * changes affect input expressions of the aggregated functions, then
4466  * we can just rescan the existing hash table; no need to build it
4467  * again.
4468  */
4469  if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4470  !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4471  {
4473  &node->perhash[0].hashiter);
4474  select_current_set(node, 0, true);
4475  return;
4476  }
4477  }
4478 
4479  /* Make sure we have closed any open tuplesorts */
4480  for (transno = 0; transno < node->numtrans; transno++)
4481  {
4482  for (setno = 0; setno < numGroupingSets; setno++)
4483  {
4484  AggStatePerTrans pertrans = &node->pertrans[transno];
4485 
4486  if (pertrans->sortstates[setno])
4487  {
4488  tuplesort_end(pertrans->sortstates[setno]);
4489  pertrans->sortstates[setno] = NULL;
4490  }
4491  }
4492  }
4493 
4494  /*
4495  * We don't need to ReScanExprContext the output tuple context here;
4496  * ExecReScan already did it. But we do need to reset our per-grouping-set
4497  * contexts, which may have transvalues stored in them. (We use rescan
4498  * rather than just reset because transfns may have registered callbacks
4499  * that need to be run now.) For the AGG_HASHED case, see below.
4500  */
4501 
4502  for (setno = 0; setno < numGroupingSets; setno++)
4503  {
4504  ReScanExprContext(node->aggcontexts[setno]);
4505  }
4506 
4507  /* Release first tuple of group, if we have made a copy */
4508  if (node->grp_firstTuple != NULL)
4509  {
4511  node->grp_firstTuple = NULL;
4512  }
4514 
4515  /* Forget current agg values */
4516  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4517  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4518 
4519  /*
4520  * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4521  * the hashcontext. This used to be an issue, but now, resetting a context
4522  * automatically deletes sub-contexts too.
4523  */
4524  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4525  {
4527 
4528  node->hash_ever_spilled = false;
4529  node->hash_spill_mode = false;
4530  node->hash_ngroups_current = 0;
4531 
4533  /* Rebuild an empty hash table */
4534  build_hash_tables(node);
4535  node->table_filled = false;
4536  /* iterator will be reset when the table is filled */
4537 
4538  hashagg_recompile_expressions(node, false, false);
4539  }
4540 
4541  if (node->aggstrategy != AGG_HASHED)
4542  {
4543  /*
4544  * Reset the per-group state (in particular, mark transvalues null)
4545  */
4546  for (setno = 0; setno < numGroupingSets; setno++)
4547  {
4548  MemSet(node->pergroups[setno], 0,
4549  sizeof(AggStatePerGroupData) * node->numaggs);
4550  }
4551 
4552  /* reset to phase 1 */
4553  initialize_phase(node, 1);
4554 
4555  node->input_done = false;
4556  node->projected_set = -1;
4557  }
4558 
4559  if (outerPlan->chgParam == NULL)
4560  ExecReScan(outerPlan);
4561 }
4562 
4563 
4564 /***********************************************************************
4565  * API exposed to aggregate functions
4566  ***********************************************************************/
4567 
4568 
4569 /*
4570  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4571  *
4572  * The transition and/or final functions of an aggregate may want to verify
4573  * that they are being called as aggregates, rather than as plain SQL
4574  * functions. They should use this function to do so. The return value
4575  * is nonzero if being called as an aggregate, or zero if not. (Specific
4576  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4577  * values could conceivably appear in future.)
4578  *
4579  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4580  * identity of the memory context that aggregate transition values are being
4581  * stored in. Note that the same aggregate call site (flinfo) may be called
4582  * interleaved on different transition values in different contexts, so it's
4583  * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4584  * cache it in the transvalue itself (for internal-type transvalues).
4585  */
4586 int
4588 {
4589  if (fcinfo->context && IsA(fcinfo->context, AggState))
4590  {
4591  if (aggcontext)
4592  {
4593  AggState *aggstate = ((AggState *) fcinfo->context);
4594  ExprContext *cxt = aggstate->curaggcontext;
4595 
4596  *aggcontext = cxt->ecxt_per_tuple_memory;
4597  }
4598  return AGG_CONTEXT_AGGREGATE;
4599  }
4600  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4601  {
4602  if (aggcontext)
4603  *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4604  return AGG_CONTEXT_WINDOW;
4605  }
4606 
4607  /* this is just to prevent "uninitialized variable" warnings */
4608  if (aggcontext)
4609  *aggcontext = NULL;
4610  return 0;
4611 }
4612 
4613 /*
4614  * AggGetAggref - allow an aggregate support function to get its Aggref
4615  *
4616  * If the function is being called as an aggregate support function,
4617  * return the Aggref node for the aggregate call. Otherwise, return NULL.
4618  *
4619  * Aggregates sharing the same inputs and transition functions can get
4620  * merged into a single transition calculation. If the transition function
4621  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4622  * executing. It must therefore not pay attention to the Aggref fields that
4623  * relate to the final function, as those are indeterminate. But if a final
4624  * function calls AggGetAggref, it will get a precise result.
4625  *
4626  * Note that if an aggregate is being used as a window function, this will
4627  * return NULL. We could provide a similar function to return the relevant
4628  * WindowFunc node in such cases, but it's not needed yet.
4629  */
4630 Aggref *
4632 {
4633  if (fcinfo->context && IsA(fcinfo->context, AggState))
4634  {
4635  AggState *aggstate = (AggState *) fcinfo->context;
4636  AggStatePerAgg curperagg;
4637  AggStatePerTrans curpertrans;
4638 
4639  /* check curperagg (valid when in a final function) */
4640  curperagg = aggstate->curperagg;
4641 
4642  if (curperagg)
4643  return curperagg->aggref;
4644 
4645  /* check curpertrans (valid when in a transition function) */
4646  curpertrans = aggstate->curpertrans;
4647 
4648  if (curpertrans)
4649  return curpertrans->aggref;
4650  }
4651  return NULL;
4652 }
4653 
4654 /*
4655  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4656  *
4657  * This is useful in agg final functions; the context returned is one that
4658  * the final function can safely reset as desired. This isn't useful for
4659  * transition functions, since the context returned MAY (we don't promise)
4660  * be the same as the context those are called in.
4661  *
4662  * As above, this is currently not useful for aggs called as window functions.
4663  */
4666 {
4667  if (fcinfo->context && IsA(fcinfo->context, AggState))
4668  {
4669  AggState *aggstate = (AggState *) fcinfo->context;
4670 
4671  return aggstate->tmpcontext->ecxt_per_tuple_memory;
4672  }
4673  return NULL;
4674 }
4675 
4676 /*
4677  * AggStateIsShared - find out whether transition state is shared
4678  *
4679  * If the function is being called as an aggregate support function,
4680  * return true if the aggregate's transition state is shared across
4681  * multiple aggregates, false if it is not.
4682  *
4683  * Returns true if not called as an aggregate support function.
4684  * This is intended as a conservative answer, ie "no you'd better not
4685  * scribble on your input". In particular, will return true if the
4686  * aggregate is being used as a window function, which is a scenario
4687  * in which changing the transition state is a bad idea. We might
4688  * want to refine the behavior for the window case in future.
4689  */
4690 bool
4692 {
4693  if (fcinfo->context && IsA(fcinfo->context, AggState))
4694  {
4695  AggState *aggstate = (AggState *) fcinfo->context;
4696  AggStatePerAgg curperagg;
4697  AggStatePerTrans curpertrans;
4698 
4699  /* check curperagg (valid when in a final function) */
4700  curperagg = aggstate->curperagg;
4701 
4702  if (curperagg)
4703  return aggstate->pertrans[curperagg->transno].aggshared;
4704 
4705  /* check curpertrans (valid when in a transition function) */
4706  curpertrans = aggstate->curpertrans;
4707 
4708  if (curpertrans)
4709  return curpertrans->aggshared;
4710  }
4711  return true;
4712 }
4713 
4714 /*
4715  * AggRegisterCallback - register a cleanup callback for an aggregate
4716  *
4717  * This is useful for aggs to register shutdown callbacks, which will ensure
4718  * that non-memory resources are freed. The callback will occur just before
4719  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4720  * either between groups or as a result of rescanning the query. The callback
4721  * will NOT be called on error paths. The typical use-case is for freeing of
4722  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4723  * created by the agg functions. (The callback will not be called until after
4724  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4725  * to return data that will be freed by the callback.)
4726  *
4727  * As above, this is currently not useful for aggs called as window functions.
4728  */
4729 void
4732  Datum arg)
4733 {
4734  if (fcinfo->context && IsA(fcinfo->context, AggState))
4735  {
4736  AggState *aggstate = (AggState *) fcinfo->context;
4737  ExprContext *cxt = aggstate->curaggcontext;
4738 
4739  RegisterExprContextCallback(cxt, func, arg);
4740 
4741  return;
4742  }
4743  elog(ERROR, "aggregate function cannot register a callback in this context");
4744 }
4745 
4746 
4747 /* ----------------------------------------------------------------
4748  * Parallel Query Support
4749  * ----------------------------------------------------------------
4750  */
4751 
4752  /* ----------------------------------------------------------------
4753  * ExecAggEstimate
4754  *
4755  * Estimate space required to propagate aggregate statistics.
4756  * ----------------------------------------------------------------
4757  */
4758 void
4760 {
4761  Size size;
4762 
4763  /* don't need this if not instrumenting or no workers */
4764  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4765  return;
4766 
4767  size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4768  size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4769  shm_toc_estimate_chunk(&pcxt->estimator, size);
4770  shm_toc_estimate_keys(&pcxt->estimator, 1);
4771 }
4772 
4773 /* ----------------------------------------------------------------
4774  * ExecAggInitializeDSM
4775  *
4776  * Initialize DSM space for aggregate statistics.
4777  * ----------------------------------------------------------------
4778  */
4779 void
4781 {
4782  Size size;
4783 
4784  /* don't need this if not instrumenting or no workers */
4785  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4786  return;
4787 
4788  size = offsetof(SharedAggInfo, sinstrument)
4789  + pcxt->nworkers * sizeof(AggregateInstrumentation);
4790  node->shared_info = shm_toc_allocate(pcxt->toc, size);
4791  /* ensure any unfilled slots will contain zeroes */
4792  memset(node->shared_info, 0, size);
4793  node->shared_info->num_workers = pcxt->nworkers;
4794  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4795  node->shared_info);
4796 }
4797 
4798 /* ----------------------------------------------------------------
4799  * ExecAggInitializeWorker
4800  *
4801  * Attach worker to DSM space for aggregate statistics.
4802  * ----------------------------------------------------------------
4803  */
4804 void
4806 {
4807  node->shared_info =
4808  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
4809 }
4810 
4811 /* ----------------------------------------------------------------
4812  * ExecAggRetrieveInstrumentation
4813  *
4814  * Transfer aggregate statistics from DSM to private memory.
4815  * ----------------------------------------------------------------
4816  */
4817 void
4819 {
4820  Size size;
4821  SharedAggInfo *si;
4822 
4823  if (node->shared_info == NULL)
4824  return;
4825 
4826  size = offsetof(SharedAggInfo, sinstrument)
4828  si = palloc(size);
4829  memcpy(si, node->shared_info, size);
4830  node->shared_info = si;
4831 }
static void hashagg_reset_spill_state(AggState *aggstate)
Definition: nodeAgg.c:3193
List * aggdistinct
Definition: primnodes.h:332
struct AggStatePerTransData * AggStatePerTrans
Definition: execnodes.h:2264
ExprState ** eqfunctions
Definition: nodeAgg.h:278
struct HashAggSpill * hash_spills
Definition: execnodes.h:2315
AggStatePerGroup * hash_pergroup
Definition: execnodes.h:2335
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:685
#define NIL
Definition: pg_list.h:65
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:977
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition: nodeAgg.c:567
struct AggStatePerGroupData * AggStatePerGroup
Definition: execnodes.h:2265
#define ScanTupleHashTable(htable, iter)
Definition: execnodes.h:769
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition: nodeAgg.c:475
int numCols
Definition: plannodes.h:861
static int partitions
Definition: pgbench.c:231
static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartittions)
Definition: nodeAgg.c:1990
List * qual
Definition: plannodes.h:142
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev)
Definition: tuplesort.c:2494
TupleHashTable BuildTupleHashTableExt(PlanState *parent, TupleDesc inputDesc, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, long nbuckets, Size additionalsize, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, bool use_variable_hash_iv)
Definition: execGrouping.c:154
bool aggvariadic
Definition: primnodes.h:335
int bms_first_member(Bitmapset *a)
Definition: bitmapset.c:996
AggStatePerPhase phases
Definition: execnodes.h:2302
double hashentrysize
Definition: execnodes.h:2327
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2040
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
Definition: nodeAgg.c:2929
#define AllocSetContextCreate
Definition: memutils.h:173
AttrNumber * hashGrpColIdxInput
Definition: nodeAgg.h:311
Datum * ecxt_aggvalues
Definition: execnodes.h:245
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition: nodeAgg.c:1916
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
Definition: execGrouping.c:359
uint64 hash_ngroups_limit
Definition: execnodes.h:2324
#define HASHAGG_MAX_PARTITIONS
Definition: nodeAgg.c:289
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1446
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:330
Index varlevelsup
Definition: primnodes.h:196
TargetEntry * get_sortgroupclause_tle(SortGroupClause *sgClause, List *targetList)
Definition: tlist.c:356
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:1247
static void hash_agg_check_limits(AggState *aggstate)
Definition: nodeAgg.c:1855
static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory)
Definition: nodeAgg.c:1965
AttrNumber * grpColIdx
Definition: plannodes.h:862
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1004
uint64 transitionSpace
Definition: plannodes.h:866
Instrumentation * instrument
Definition: execnodes.h:974
static void agg_fill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2535
int aggtransno
Definition: primnodes.h:341
Bitmapset * colnos_needed
Definition: execnodes.h:2297
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:499
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
List * lcons_int(int datum, List *list)
Definition: list.c:486
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:1576
int numaggs
Definition: execnodes.h:2273
int nfreetapes
Definition: nodeAgg.c:331
Oid GetUserId(void)
Definition: miscinit.c:478
bool agg_done
Definition: execnodes.h:2291
#define castNode(_type_, nodeptr)
Definition: nodes.h:608
Oid * grpCollations
Definition: plannodes.h:864
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:555
#define TTS_EMPTY(slot)
Definition: tuptable.h:97
TupleTableSlot * sort_slot
Definition: execnodes.h:2305
List * all_grouped_cols
Definition: execnodes.h:2296
Tuplesortstate * sort_out
Definition: execnodes.h:2304
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1692
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1159
ScanState ss
Definition: execnodes.h:2271
FmgrInfo equalfnOne
Definition: nodeAgg.h:110
ExprContext * ps_ExprContext
Definition: execnodes.h:1003
MinimalTuple firstTuple
Definition: execnodes.h:722
shm_toc_estimator estimator
Definition: parallel.h:42
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition: nodeAgg.c:3151
ExprState * evaltrans
Definition: nodeAgg.h:283
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
int64 input_tuples
Definition: nodeAgg.c:370
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
int plan_node_id
Definition: plannodes.h:140
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct HashTapeInfo HashTapeInfo
Oid inputcollid
Definition: primnodes.h:326
int current_phase
Definition: execnodes.h:2279
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition: nodeAgg.c:3117
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition: tuptable.h:341
Definition: nodes.h:539
AggSplit aggsplit
Definition: execnodes.h:2276
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition: nodeAgg.c:2153
bool * nullsFirst
Definition: plannodes.h:814
int errcode(int sqlerrcode)
Definition: elog.c:698
List * args
Definition: primnodes.h:330
#define MemSet(start, val, len)
Definition: c.h:1008
AttrNumber varattno
Definition: primnodes.h:191
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
fmNodePtr context
Definition: fmgr.h:88
Datum * tts_values
Definition: tuptable.h:126
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1376
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition: parse_agg.c:2080
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1056
bool all_cols_needed
Definition: execnodes.h:2299
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2104
AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER]
Definition: execnodes.h:2247
TupleTableSlot * hash_spill_rslot
Definition: execnodes.h:2317
AggStatePerTrans pertrans
Definition: execnodes.h:2281
EState * state
Definition: execnodes.h:966
int projected_set
Definition: execnodes.h:2292
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1148
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
uint32 hash_bytes_uint32(uint32 k)
Definition: hashfn.c:610
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:395
HeapTuple grp_firstTuple
Definition: execnodes.h:2309
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
Definition: primnodes.h:186
Aggref * aggref
Definition: nodeAgg.h:187
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition: nodeAgg.c:1372
static void advance_aggregates(AggState *aggstate)
Definition: nodeAgg.c:838
int current_set
Definition: execnodes.h:2294
uint32 mask
Definition: nodeAgg.c:350
#define OidIsValid(objectId)
Definition: c.h:710
#define DO_AGGSPLIT_COMBINE(as)
Definition: nodes.h:801
FunctionCallInfo transfn_fcinfo
Definition: nodeAgg.h:162
TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 *hash)
Definition: execGrouping.c:304
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4780
struct HashAggBatch HashAggBatch
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:650
Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
int numtrans
Definition: execnodes.h:2274
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1469
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition: nodeAgg.c:1881
TupleDesc sortdesc
Definition: nodeAgg.h:138
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, double input_groups, double hashentrysize)
Definition: nodeAgg.c:2949
Oid * sortOperators
Definition: plannodes.h:812
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:951
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
Definition: execGrouping.c:96
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
void ResetTupleHashTable(TupleHashTable hashtable)
Definition: execGrouping.c:283
ExprContext * tmpcontext
Definition: execnodes.h:2284
FmgrInfo transfn
Definition: nodeAgg.h:81
#define HASHAGG_PARTITION_FACTOR
Definition: nodeAgg.c:287
static void build_hash_table(AggState *aggstate, int setno, long nbuckets)
Definition: nodeAgg.c:1504
int max_colno_needed
Definition: execnodes.h:2298
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition: nodeAgg.c:1264
bool hash_spill_mode
Definition: execnodes.h:2321
#define FUNC_MAX_ARGS
static void hashagg_tapeinfo_init(AggState *aggstate)
Definition: nodeAgg.c:2885
List * hash_batches
Definition: execnodes.h:2319
Aggref * aggref
Definition: nodeAgg.h:44
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:897
#define linitial_int(l)
Definition: pg_list.h:175
Bitmapset ** grouped_cols
Definition: nodeAgg.h:277
PlanState ps
Definition: execnodes.h:1373
LogicalTapeSet * tapeset
Definition: nodeAgg.c:368
int maxsets
Definition: execnodes.h:2301
static bool agg_refill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2589
static bool find_cols_walker(Node *node, FindColsContext *context)
Definition: nodeAgg.c:1421
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
Definition: nodeAgg.c:1693
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3313
void initHyperLogLog(hyperLogLogState *cState, uint8 bwidth)
Definition: hyperloglog.c:66
#define DO_AGGSPLIT_SERIALIZE(as)
Definition: nodes.h:803
#define HASHAGG_MIN_PARTITIONS
Definition: nodeAgg.c:288
void pfree(void *pointer)
Definition: mcxt.c:1169
MemoryContext es_query_cxt
Definition: execnodes.h:598
AggStrategy aggstrategy
Definition: plannodes.h:859
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition: nodeAgg.c:3245
#define linitial(l)
Definition: pg_list.h:174
bool table_filled
Definition: execnodes.h:2311
AggStrategy aggstrategy
Definition: execnodes.h:2275
#define HASHAGG_HLL_BIT_WIDTH
Definition: nodeAgg.c:306
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:775
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2740
static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
Definition: nodeAgg.c:1398
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
bool fn_strict
Definition: fmgr.h:61
#define lfirst_int(lc)
Definition: pg_list.h:170
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1579
MemoryContext hash_metacxt
Definition: execnodes.h:2313
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition: nodeAgg.c:2189
#define AGG_CONTEXT_AGGREGATE
Definition: fmgr.h:738
struct TupleHashEntryData TupleHashEntryData
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static void find_hash_columns(AggState *aggstate)
Definition: nodeAgg.c:1564
ExprState * equalfnMulti
Definition: nodeAgg.h:111
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
Tuplesortstate * sort_in
Definition: execnodes.h:2303
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1058
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2408
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup)
Definition: nodeAgg.c:1309
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4691
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, int ndest)
Definition: nodeAgg.c:2906
#define list_nth_node(type, list, n)
Definition: pg_list.h:306
Tuplesortstate ** sortstates
Definition: nodeAgg.h:154
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
Bitmapset * aggParams
Definition: plannodes.h:867
static int initValue(long lng_val)
Definition: informix.c:677
MemoryContext tablecxt
Definition: execnodes.h:744
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:535
bool * tts_isnull
Definition: tuptable.h:128
int npartitions
Definition: nodeAgg.c:347
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:315
MinimalTupleData * MinimalTuple
Definition: htup.h:27
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:962
List * aggorder
Definition: primnodes.h:331
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4759
int errcode_for_file_access(void)
Definition: elog.c:721
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:135
AttrNumber resno
Definition: primnodes.h:1445
#define DatumGetBool(X)
Definition: postgres.h:437
int ParallelWorkerNumber
Definition: parallel.c:112
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
Definition: nodeAgg.c:2980
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
Index agglevelsup
Definition: primnodes.h:338
int used_bits
Definition: nodeAgg.c:367
struct AggregateInstrumentation AggregateInstrumentation
Bitmapset * unaggregated
Definition: nodeAgg.c:379
#define TupIsNull(slot)
Definition: tuptable.h:292
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:203
unsigned int uint32
Definition: c.h:441
List * aggdirectargs
Definition: primnodes.h:329
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition: nodeAgg.c:4354
AggStatePerAgg curperagg
Definition: execnodes.h:2287
AttrNumber * sortColIdx
Definition: nodeAgg.h:100
struct AggStatePerGroupData AggStatePerGroupData
struct HashTapeInfo * hash_tapeinfo
Definition: execnodes.h:2314
AggStatePerHash perhash
Definition: execnodes.h:2334
bool outeropsset
Definition: execnodes.h:1045
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition: nodeAgg.c:685
AggStrategy aggstrategy
Definition: nodeAgg.h:274
ExprState * evaltrans_cache[2][2]
Definition: nodeAgg.h:291
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1066
#define EXEC_FLAG_REWIND
Definition: executor.h:57
hyperLogLogState * hll_card
Definition: nodeAgg.c:352
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2821
void build_aggregate_combinefn_expr(Oid agg_state_type, Oid agg_input_collation, Oid combinefn_oid, Expr **combinefnexpr)
Definition: parse_agg.c:2028
Datum value
Definition: postgres.h:422
Bitmapset * grouped_cols
Definition: execnodes.h:2295
#define IsParallelWorker()
Definition: parallel.h:61
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:131
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1171
void ExecAggRetrieveInstrumentation(AggState *node)
Definition: nodeAgg.c:4818
int hash_batches_used
Definition: execnodes.h:2332
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4665
Bitmapset * chgParam
Definition: execnodes.h:996
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:191
int ntapes
Definition: nodeAgg.c:329
bool IsBinaryCoercible(Oid srctype, Oid targettype)
int my_log2(long num)
Definition: dynahash.c:1765
double input_card
Definition: nodeAgg.c:371
#define outerPlan(node)
Definition: plannodes.h:171
List * lappend(List *list, void *datum)
Definition: list.c:336
Bitmapset * aggregated
Definition: nodeAgg.c:378
TupleHashIterator hashiter
Definition: nodeAgg.h:304
int numCols
Definition: plannodes.h:810
Index varno
Definition: primnodes.h:189
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:598
int num_hashes
Definition: execnodes.h:2312
Plan plan
Definition: plannodes.h:858
AttrNumber * hashGrpColIdxHash
Definition: nodeAgg.h:312
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
bool input_done
Definition: execnodes.h:2290
#define SizeofMinimalTupleHeader
Definition: htup_details.h:648
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
ExprContext * curaggcontext
Definition: execnodes.h:2286
ExprContext * hashcontext
Definition: execnodes.h:2282
bool * ecxt_aggnulls
Definition: execnodes.h:247
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:422
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define TextDatumGetCString(d)
Definition: builtins.h:83
List * es_tupleTable
Definition: execnodes.h:600
#define HASHAGG_READ_BUFFER_SIZE
Definition: nodeAgg.c:297
AggStatePerPhase phase
Definition: execnodes.h:2277
void * palloc0(Size size)
Definition: mcxt.c:1093
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:970
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:411
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
struct FunctionCallInfoBaseData * FunctionCallInfo
Definition: fmgr.h:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:252
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1388
FmgrInfo deserialfn
Definition: nodeAgg.h:87
int work_mem
Definition: globals.c:124
List * groupingSets
Definition: plannodes.h:869
int16 resulttypeLen
Definition: nodeAgg.h:216
static void initialize_phase(AggState *aggstate, int newphase)
Definition: nodeAgg.c:497
double estimateHyperLogLog(hyperLogLogState *cState)
Definition: hyperloglog.c:186
LogicalTapeSet * tapeset
Definition: nodeAgg.c:328
struct FindColsContext FindColsContext
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:136
Plan * plan
Definition: execnodes.h:964
#define InvalidOid
Definition: postgres_ext.h:36
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1256
Oid aggfnoid
Definition: primnodes.h:323
int16 attnum
Definition: pg_attribute.h:83
#define ResetTupleHashIterator(htable, iter)
Definition: execnodes.h:767
#define ereport(elevel,...)
Definition: elog.h:157
static HeapTuple ExecCopySlotHeapTuple(TupleTableSlot *slot)
Definition: tuptable.h:452
static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:726
#define LOCAL_FCINFO(name, nargs)
Definition: fmgr.h:110
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
Definition: nodeAgg.c:1740
List * lcons(void *datum, List *list)
Definition: list.c:468
static void prepare_hash_slot(AggStatePerHash perhash, TupleTableSlot *inputslot, TupleTableSlot *hashslot)
Definition: nodeAgg.c:1219
int aggno
Definition: primnodes.h:340
uint64 hash_disk_used
Definition: execnodes.h:2331
Size MemoryContextMemAllocated(MemoryContext context, bool recurse)
Definition: mcxt.c:477
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define Max(x, y)
Definition: c.h:980
ExprContext ** aggcontexts
Definition: execnodes.h:2283
#define makeNode(_type_)
Definition: nodes.h:587
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:230
int plan_width
Definition: plannodes.h:124
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FmgrInfo * hashfunctions
Definition: nodeAgg.h:306
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
void RegisterExprContextCallback(ExprContext *econtext, ExprContextCallbackFunction function, Datum arg)
Definition: execUtils.c:925
FmgrInfo serialfn
Definition: nodeAgg.h:84
ExprState * execTuplesMatchPrepare(TupleDesc desc, int numCols, const AttrNumber *keyColIdx, const Oid *eqOperators, const Oid *collations, PlanState *parent)
Definition: execGrouping.c:59
int input_tapenum
Definition: nodeAgg.c:369
FunctionCallInfo deserialfn_fcinfo
Definition: nodeAgg.h:167
#define EXEC_FLAG_MARK
Definition: executor.h:59
AggSplit aggsplit
Definition: plannodes.h:860
struct AggStatePerAggData * AggStatePerAgg
Definition: execnodes.h:2263
void ExecReScanAgg(AggState *node)
Definition: nodeAgg.c:4440
void build_aggregate_serialfn_expr(Oid serialfn_oid, Expr **serialfnexpr)
Definition: parse_agg.c:2057
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:109
Expr * expr
Definition: primnodes.h:1444
AggSplit aggsplit
Definition: primnodes.h:339
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:758
void(* ExprContextCallbackFunction)(Datum arg)
Definition: execnodes.h:188
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1967
bool hash_ever_spilled
Definition: execnodes.h:2320
AggStatePerGroup * pergroups
Definition: execnodes.h:2307
void freeHyperLogLog(hyperLogLogState *cState)
Definition: hyperloglog.c:151
size_t Size
Definition: c.h:540
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:225
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:480
#define AGG_CONTEXT_WINDOW
Definition: fmgr.h:739
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
FunctionCallInfo serialfn_fcinfo
Definition: nodeAgg.h:165
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
bool expression_tree_walker(Node *node, bool(*walker)(), void *context)
Definition: nodeFuncs.c:1904
static int list_length(const List *l)
Definition: pg_list.h:149
long numGroups
Definition: plannodes.h:865
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:759
#define DO_AGGSPLIT_SKIPFINAL(as)
Definition: nodes.h:802
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2198
void addHyperLogLog(hyperLogLogState *cState, uint32 hash)
Definition: hyperloglog.c:167
Expr * aggfilter
Definition: primnodes.h:333
int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
Definition: nodeAgg.c:4587
TupleDesc ExecTypeFromTL(List *targetList)
Definition: execTuples.c:1938
#define MAXALIGN(LEN)
Definition: c.h:757
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
void ReScanExprContext(ExprContext *econtext)
Definition: execUtils.c:438
static TupleTableSlot * agg_retrieve_hash_table_in_memory(AggState *aggstate)
Definition: nodeAgg.c:2765
LogicalTapeSet * tapeset
Definition: nodeAgg.c:346
bool outeropsfixed
Definition: execnodes.h:1041
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define DO_AGGSPLIT_DESERIALIZE(as)
Definition: nodes.h:804
Size hash_mem_limit
Definition: execnodes.h:2323
struct Plan * lefttree
Definition: plannodes.h:143
TupleTableSlot * uniqslot
Definition: nodeAgg.h:137
int numphases
Definition: execnodes.h:2278
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1182
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:490
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:863
List * targetlist
Definition: plannodes.h:141
ExprState * qual
Definition: execnodes.h:985
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAgg.c:4805
#define DatumGetPointer(X)
Definition: postgres.h:593
AttrNumber * sortColIdx
Definition: plannodes.h:811
#define CHUNKHDRSZ
Definition: nodeAgg.c:312
#define HASHAGG_WRITE_BUFFER_SIZE
Definition: nodeAgg.c:298