PostgreSQL Source Code  git master
nodeAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  * Routines to handle aggregate nodes.
5  *
6  * ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  * transvalue = initcond
9  * foreach input_tuple do
10  * transvalue = transfunc(transvalue, input_value(s))
11  * result = finalfunc(transvalue, direct_argument(s))
12  *
13  * If a finalfunc is not supplied then the result is just the ending
14  * value of transvalue.
15  *
16  * Other behaviors can be selected by the "aggsplit" mode, which exists
17  * to support partial aggregation. It is possible to:
18  * * Skip running the finalfunc, so that the output is always the
19  * final transvalue state.
20  * * Substitute the combinefunc for the transfunc, so that transvalue
21  * states (propagated up from a child partial-aggregation step) are merged
22  * rather than processing raw input rows. (The statements below about
23  * the transfunc apply equally to the combinefunc, when it's selected.)
24  * * Apply the serializefunc to the output values (this only makes sense
25  * when skipping the finalfunc, since the serializefunc works on the
26  * transvalue data type).
27  * * Apply the deserializefunc to the input values (this only makes sense
28  * when using the combinefunc, for similar reasons).
29  * It is the planner's responsibility to connect up Agg nodes using these
30  * alternate behaviors in a way that makes sense, with partial aggregation
31  * results being fed to nodes that expect them.
32  *
33  * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  * input tuples and eliminate duplicates (if required) before performing
35  * the above-depicted process. (However, we don't do that for ordered-set
36  * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  * so far as this module is concerned.) Note that partial aggregation
38  * is not supported in these cases, since we couldn't ensure global
39  * ordering or distinctness of the inputs.
40  *
41  * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  * then the first non-NULL input_value is assigned directly to transvalue,
43  * and transfunc isn't applied until the second non-NULL input_value.
44  * The agg's first input type and transtype must be the same in this case!
45  *
46  * If transfunc is marked "strict" then NULL input_values are skipped,
47  * keeping the previous transvalue. If transfunc is not strict then it
48  * is called for every input tuple and must deal with NULL initcond
49  * or NULL input_values for itself.
50  *
51  * If finalfunc is marked "strict" then it is not called when the
52  * ending transvalue is NULL, instead a NULL result is created
53  * automatically (this is just the usual handling of strict functions,
54  * of course). A non-strict finalfunc can make its own choice of
55  * what to return for a NULL ending transvalue.
56  *
57  * Ordered-set aggregates are treated specially in one other way: we
58  * evaluate any "direct" arguments and pass them to the finalfunc along
59  * with the transition value.
60  *
61  * A finalfunc can have additional arguments beyond the transvalue and
62  * any "direct" arguments, corresponding to the input arguments of the
63  * aggregate. These are always just passed as NULL. Such arguments may be
64  * needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  * We compute aggregate input expressions and run the transition functions
67  * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68  * once per input tuple, so when the transvalue datatype is
69  * pass-by-reference, we have to be careful to copy it into a longer-lived
70  * memory context, and free the prior value to avoid memory leakage. We
71  * store transvalues in another set of econtexts, aggstate->aggcontexts
72  * (one per grouping set, see below), which are also used for the hashtable
73  * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74  * reset, at group boundaries so that aggregate transition functions can
75  * register shutdown callbacks via AggRegisterCallback.
76  *
77  * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  * run finalize functions and compute the output tuple; this context can be
79  * reset once per output tuple.
80  *
81  * The executor's AggState node is passed as the fmgr "context" value in
82  * all transfunc and finalfunc calls. It is not recommended that the
83  * transition functions look at the AggState node directly, but they can
84  * use AggCheckCallContext() to verify that they are being called by
85  * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86  * transition function might want to know this is so that it can avoid
87  * palloc'ing a fixed-size pass-by-ref transition value on every call:
88  * it can instead just scribble on and return its left input. Ordinarily
89  * it is completely forbidden for functions to modify pass-by-ref inputs,
90  * but in the aggregate case we know the left input is either the initial
91  * transition value or a previous function result, and in either case its
92  * value need not be preserved. See int8inc() for an example. Notice that
93  * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  * the previous transition value pointer is returned. It is also possible
95  * to avoid repeated data copying when the transition value is an expanded
96  * object: to do that, the transition function must take care to return
97  * an expanded object that is in a child context of the memory context
98  * returned by AggCheckCallContext(). Also, some transition functions want
99  * to store working state in addition to the nominal transition value; they
100  * can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103  * AggState is available as context in earlier releases (back to 8.1),
104  * but direct examination of the node is needed to use it before 9.0.
105  *
106  * As of 9.4, aggregate transition functions can also use AggGetAggref()
107  * to get hold of the Aggref expression node for their aggregate call.
108  * This is mainly intended for ordered-set aggregates, which are not
109  * supported as window functions. (A regular aggregate function would
110  * need some fallback logic to use this, since there's no Aggref node
111  * for a window function.)
112  *
113  * Grouping sets:
114  *
115  * A list of grouping sets which is structurally equivalent to a ROLLUP
116  * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  * ordered data. We do this by keeping a separate set of transition values
118  * for each grouping set being concurrently processed; for each input tuple
119  * we update them all, and on group boundaries we reset those states
120  * (starting at the front of the list) whose grouping values have changed
121  * (the list of grouping sets is ordered from most specific to least
122  * specific).
123  *
124  * Where more complex grouping sets are used, we break them down into
125  * "phases", where each phase has a different sort order (except phase 0
126  * which is reserved for hashing). During each phase but the last, the
127  * input tuples are additionally stored in a tuplesort which is keyed to the
128  * next phase's sort order; during each phase but the first, the input
129  * tuples are drawn from the previously sorted data. (The sorting of the
130  * data for the first phase is handled by the planner, as it might be
131  * satisfied by underlying nodes.)
132  *
133  * Hashing can be mixed with sorted grouping. To do this, we have an
134  * AGG_MIXED strategy that populates the hashtables during the first sorted
135  * phase, and switches to reading them out after completing all sort phases.
136  * We can also support AGG_HASHED with multiple hash tables and no sorting
137  * at all.
138  *
139  * From the perspective of aggregate transition and final functions, the
140  * only issue regarding grouping sets is this: a single call site (flinfo)
141  * of an aggregate function may be used for updating several different
142  * transition values in turn. So the function must not cache in the flinfo
143  * anything which logically belongs as part of the transition value (most
144  * importantly, the memory context in which the transition value exists).
145  * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  * sensitive to the grouping set for which the aggregate function is
147  * currently being called.
148  *
149  * Plan structure:
150  *
151  * What we get from the planner is actually one "real" Agg node which is
152  * part of the plan tree proper, but which optionally has an additional list
153  * of Agg nodes hung off the side via the "chain" field. This is because an
154  * Agg node happens to be a convenient representation of all the data we
155  * need for grouping sets.
156  *
157  * For many purposes, we treat the "real" node as if it were just the first
158  * node in the chain. The chain must be ordered such that hashed entries
159  * come before sorted/plain entries; the real node is marked AGG_MIXED if
160  * there are both types present (in which case the real node describes one
161  * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163  * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  * chained nodes.
166  *
167  * We collect all hashed nodes into a single "phase", numbered 0, and create
168  * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  * Phase 0 is allocated even if there are no hashes, but remains unused in
170  * that case.
171  *
172  * AGG_HASHED nodes actually refer to only a single grouping set each,
173  * because for each hashed grouping we need a separate grpColIdx and
174  * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175  * grouping sets that share a sort order. Each AGG_SORTED node other than
176  * the first one has an associated Sort node which describes the sort order
177  * to be used; the first sorted node takes its input from the outer subtree,
178  * which the planner has already arranged to provide ordered data.
179  *
180  * Memory and ExprContext usage:
181  *
182  * Because we're accumulating aggregate values across input rows, we need to
183  * use more memory contexts than just simple input/output tuple contexts.
184  * In fact, for a rollup, we need a separate context for each grouping set
185  * so that we can reset the inner (finer-grained) aggregates on their group
186  * boundaries while continuing to accumulate values for outer
187  * (coarser-grained) groupings. On top of this, we might be simultaneously
188  * populating hashtables; however, we only need one context for all the
189  * hashtables.
190  *
191  * So we create an array, aggcontexts, with an ExprContext for each grouping
192  * set in the largest rollup that we're going to process, and use the
193  * per-tuple memory context of those ExprContexts to store the aggregate
194  * transition values. hashcontext is the single context created to support
195  * all hash tables.
196  *
197  * Spilling To Disk
198  *
199  * When performing hash aggregation, if the hash table memory exceeds the
200  * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  * mode, we advance the transition states only for groups already in the
202  * hash table. For tuples that would need to create a new hash table
203  * entries (and initialize new transition states), we instead spill them to
204  * disk to be processed later. The tuples are spilled in a partitioned
205  * manner, so that subsequent batches are smaller and less likely to exceed
206  * hash_mem (if a batch does exceed hash_mem, it must be spilled
207  * recursively).
208  *
209  * Spilled data is written to logical tapes. These provide better control
210  * over memory usage, disk space, and the number of files than if we were
211  * to use a BufFile for each spill. We don't know the number of tapes needed
212  * at the start of the algorithm (because it can recurse), so a tape set is
213  * allocated at the beginning, and individual tapes are created as needed.
214  * As a particular tape is read, logtape.c recycles its disk space. When a
215  * tape is read to completion, it is destroyed entirely.
216  *
217  * Tapes' buffers can take up substantial memory when many tapes are open at
218  * once. We only need one tape open at a time in read mode (using a buffer
219  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
220  * requiring a buffer of size BLCKSZ) for each partition.
221  *
222  * Note that it's possible for transition states to start small but then
223  * grow very large; for instance in the case of ARRAY_AGG. In such cases,
224  * it's still possible to significantly exceed hash_mem. We try to avoid
225  * this situation by estimating what will fit in the available memory, and
226  * imposing a limit on the number of groups separately from the amount of
227  * memory consumed.
228  *
229  * Transition / Combine function invocation:
230  *
231  * For performance reasons transition functions, including combine
232  * functions, aren't invoked one-by-one from nodeAgg.c after computing
233  * arguments using the expression evaluation engine. Instead
234  * ExecBuildAggTrans() builds one large expression that does both argument
235  * evaluation and transition function invocation. That avoids performance
236  * issues due to repeated uses of expression evaluation, complications due
237  * to filter expressions having to be evaluated early, and allows to JIT
238  * the entire expression into one native function.
239  *
240  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
241  * Portions Copyright (c) 1994, Regents of the University of California
242  *
243  * IDENTIFICATION
244  * src/backend/executor/nodeAgg.c
245  *
246  *-------------------------------------------------------------------------
247  */
248 
249 #include "postgres.h"
250 
251 #include "access/htup_details.h"
252 #include "access/parallel.h"
253 #include "catalog/objectaccess.h"
254 #include "catalog/pg_aggregate.h"
255 #include "catalog/pg_proc.h"
256 #include "catalog/pg_type.h"
257 #include "common/hashfn.h"
258 #include "executor/execExpr.h"
259 #include "executor/executor.h"
260 #include "executor/nodeAgg.h"
261 #include "lib/hyperloglog.h"
262 #include "miscadmin.h"
263 #include "nodes/makefuncs.h"
264 #include "nodes/nodeFuncs.h"
265 #include "optimizer/optimizer.h"
266 #include "parser/parse_agg.h"
267 #include "parser/parse_coerce.h"
268 #include "utils/acl.h"
269 #include "utils/builtins.h"
270 #include "utils/datum.h"
271 #include "utils/dynahash.h"
272 #include "utils/expandeddatum.h"
273 #include "utils/logtape.h"
274 #include "utils/lsyscache.h"
275 #include "utils/memutils.h"
276 #include "utils/syscache.h"
277 #include "utils/tuplesort.h"
278 
279 /*
280  * Control how many partitions are created when spilling HashAgg to
281  * disk.
282  *
283  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
284  * partitions needed such that each partition will fit in memory. The factor
285  * is set higher than one because there's not a high cost to having a few too
286  * many partitions, and it makes it less likely that a partition will need to
287  * be spilled recursively. Another benefit of having more, smaller partitions
288  * is that small hash tables may perform better than large ones due to memory
289  * caching effects.
290  *
291  * We also specify a min and max number of partitions per spill. Too few might
292  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
293  * many will result in lots of memory wasted buffering the spill files (which
294  * could instead be spent on a larger hash table).
295  */
296 #define HASHAGG_PARTITION_FACTOR 1.50
297 #define HASHAGG_MIN_PARTITIONS 4
298 #define HASHAGG_MAX_PARTITIONS 1024
299 
300 /*
301  * For reading from tapes, the buffer size must be a multiple of
302  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
303  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
304  * tape always uses a buffer of size BLCKSZ.
305  */
306 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
307 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
308 
309 /*
310  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
311  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
312  * worst-case error of around 18%. That's effective enough to choose a
313  * reasonable number of partitions when recursing.
314  */
315 #define HASHAGG_HLL_BIT_WIDTH 5
316 
317 /*
318  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
319  * improved?
320  */
321 #define CHUNKHDRSZ 16
322 
323 /*
324  * Represents partitioned spill data for a single hashtable. Contains the
325  * necessary information to route tuples to the correct partition, and to
326  * transform the spilled data into new batches.
327  *
328  * The high bits are used for partition selection (when recursing, we ignore
329  * the bits that have already been used for partition selection at an earlier
330  * level).
331  */
332 typedef struct HashAggSpill
333 {
334  int npartitions; /* number of partitions */
335  LogicalTape **partitions; /* spill partition tapes */
336  int64 *ntuples; /* number of tuples in each partition */
337  uint32 mask; /* mask to find partition from hash value */
338  int shift; /* after masking, shift by this amount */
339  hyperLogLogState *hll_card; /* cardinality estimate for contents */
341 
342 /*
343  * Represents work to be done for one pass of hash aggregation (with only one
344  * grouping set).
345  *
346  * Also tracks the bits of the hash already used for partition selection by
347  * earlier iterations, so that this batch can use new bits. If all bits have
348  * already been used, no partitioning will be done (any spilled data will go
349  * to a single output tape).
350  */
351 typedef struct HashAggBatch
352 {
353  int setno; /* grouping set */
354  int used_bits; /* number of bits of hash already used */
355  LogicalTape *input_tape; /* input partition tape */
356  int64 input_tuples; /* number of tuples in this batch */
357  double input_card; /* estimated group cardinality */
359 
360 /* used to find referenced colnos */
361 typedef struct FindColsContext
362 {
363  bool is_aggref; /* is under an aggref */
364  Bitmapset *aggregated; /* column references under an aggref */
365  Bitmapset *unaggregated; /* other column references */
367 
368 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
369 static void initialize_phase(AggState *aggstate, int newphase);
370 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
371 static void initialize_aggregates(AggState *aggstate,
372  AggStatePerGroup *pergroups,
373  int numReset);
374 static void advance_transition_function(AggState *aggstate,
375  AggStatePerTrans pertrans,
376  AggStatePerGroup pergroupstate);
377 static void advance_aggregates(AggState *aggstate);
378 static void process_ordered_aggregate_single(AggState *aggstate,
379  AggStatePerTrans pertrans,
380  AggStatePerGroup pergroupstate);
381 static void process_ordered_aggregate_multi(AggState *aggstate,
382  AggStatePerTrans pertrans,
383  AggStatePerGroup pergroupstate);
384 static void finalize_aggregate(AggState *aggstate,
385  AggStatePerAgg peragg,
386  AggStatePerGroup pergroupstate,
387  Datum *resultVal, bool *resultIsNull);
388 static void finalize_partialaggregate(AggState *aggstate,
389  AggStatePerAgg peragg,
390  AggStatePerGroup pergroupstate,
391  Datum *resultVal, bool *resultIsNull);
392 static inline void prepare_hash_slot(AggStatePerHash perhash,
393  TupleTableSlot *inputslot,
394  TupleTableSlot *hashslot);
395 static void prepare_projection_slot(AggState *aggstate,
396  TupleTableSlot *slot,
397  int currentSet);
398 static void finalize_aggregates(AggState *aggstate,
399  AggStatePerAgg peraggs,
400  AggStatePerGroup pergroup);
401 static TupleTableSlot *project_aggregates(AggState *aggstate);
402 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
403  Bitmapset **unaggregated);
404 static bool find_cols_walker(Node *node, FindColsContext *context);
405 static void build_hash_tables(AggState *aggstate);
406 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
407 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
408  bool nullcheck);
409 static long hash_choose_num_buckets(double hashentrysize,
410  long ngroups, Size memory);
411 static int hash_choose_num_partitions(double input_groups,
412  double hashentrysize,
413  int used_bits,
414  int *log2_npartitions);
415 static void initialize_hash_entry(AggState *aggstate,
416  TupleHashTable hashtable,
417  TupleHashEntry entry);
418 static void lookup_hash_entries(AggState *aggstate);
419 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
420 static void agg_fill_hash_table(AggState *aggstate);
421 static bool agg_refill_hash_table(AggState *aggstate);
424 static void hash_agg_check_limits(AggState *aggstate);
425 static void hash_agg_enter_spill_mode(AggState *aggstate);
426 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
427  int npartitions);
428 static void hashagg_finish_initial_spills(AggState *aggstate);
429 static void hashagg_reset_spill_state(AggState *aggstate);
430 static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
431  int64 input_tuples, double input_card,
432  int used_bits);
433 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
434 static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset,
435  int used_bits, double input_groups,
436  double hashentrysize);
437 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
438  TupleTableSlot *inputslot, uint32 hash);
439 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
440  int setno);
441 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
442 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
443  AggState *aggstate, EState *estate,
444  Aggref *aggref, Oid transfn_oid,
445  Oid aggtranstype, Oid aggserialfn,
446  Oid aggdeserialfn, Datum initValue,
447  bool initValueIsNull, Oid *inputTypes,
448  int numArguments);
449 
450 
451 /*
452  * Select the current grouping set; affects current_set and
453  * curaggcontext.
454  */
455 static void
456 select_current_set(AggState *aggstate, int setno, bool is_hash)
457 {
458  /*
459  * When changing this, also adapt ExecAggPlainTransByVal() and
460  * ExecAggPlainTransByRef().
461  */
462  if (is_hash)
463  aggstate->curaggcontext = aggstate->hashcontext;
464  else
465  aggstate->curaggcontext = aggstate->aggcontexts[setno];
466 
467  aggstate->current_set = setno;
468 }
469 
470 /*
471  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
472  * current_phase + 1. Juggle the tuplesorts accordingly.
473  *
474  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
475  * case, so when entering phase 0, all we need to do is drop open sorts.
476  */
477 static void
478 initialize_phase(AggState *aggstate, int newphase)
479 {
480  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
481 
482  /*
483  * Whatever the previous state, we're now done with whatever input
484  * tuplesort was in use.
485  */
486  if (aggstate->sort_in)
487  {
488  tuplesort_end(aggstate->sort_in);
489  aggstate->sort_in = NULL;
490  }
491 
492  if (newphase <= 1)
493  {
494  /*
495  * Discard any existing output tuplesort.
496  */
497  if (aggstate->sort_out)
498  {
499  tuplesort_end(aggstate->sort_out);
500  aggstate->sort_out = NULL;
501  }
502  }
503  else
504  {
505  /*
506  * The old output tuplesort becomes the new input one, and this is the
507  * right time to actually sort it.
508  */
509  aggstate->sort_in = aggstate->sort_out;
510  aggstate->sort_out = NULL;
511  Assert(aggstate->sort_in);
512  tuplesort_performsort(aggstate->sort_in);
513  }
514 
515  /*
516  * If this isn't the last phase, we need to sort appropriately for the
517  * next phase in sequence.
518  */
519  if (newphase > 0 && newphase < aggstate->numphases - 1)
520  {
521  Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
522  PlanState *outerNode = outerPlanState(aggstate);
523  TupleDesc tupDesc = ExecGetResultType(outerNode);
524 
525  aggstate->sort_out = tuplesort_begin_heap(tupDesc,
526  sortnode->numCols,
527  sortnode->sortColIdx,
528  sortnode->sortOperators,
529  sortnode->collations,
530  sortnode->nullsFirst,
531  work_mem,
532  NULL, TUPLESORT_NONE);
533  }
534 
535  aggstate->current_phase = newphase;
536  aggstate->phase = &aggstate->phases[newphase];
537 }
538 
539 /*
540  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
541  * populated by the previous phase. Copy it to the sorter for the next phase
542  * if any.
543  *
544  * Callers cannot rely on memory for tuple in returned slot remaining valid
545  * past any subsequently fetched tuple.
546  */
547 static TupleTableSlot *
549 {
550  TupleTableSlot *slot;
551 
552  if (aggstate->sort_in)
553  {
554  /* make sure we check for interrupts in either path through here */
556  if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
557  aggstate->sort_slot, NULL))
558  return NULL;
559  slot = aggstate->sort_slot;
560  }
561  else
562  slot = ExecProcNode(outerPlanState(aggstate));
563 
564  if (!TupIsNull(slot) && aggstate->sort_out)
565  tuplesort_puttupleslot(aggstate->sort_out, slot);
566 
567  return slot;
568 }
569 
570 /*
571  * (Re)Initialize an individual aggregate.
572  *
573  * This function handles only one grouping set, already set in
574  * aggstate->current_set.
575  *
576  * When called, CurrentMemoryContext should be the per-query context.
577  */
578 static void
580  AggStatePerGroup pergroupstate)
581 {
582  /*
583  * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
584  */
585  if (pertrans->aggsortrequired)
586  {
587  /*
588  * In case of rescan, maybe there could be an uncompleted sort
589  * operation? Clean it up if so.
590  */
591  if (pertrans->sortstates[aggstate->current_set])
592  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
593 
594 
595  /*
596  * We use a plain Datum sorter when there's a single input column;
597  * otherwise sort the full tuple. (See comments for
598  * process_ordered_aggregate_single.)
599  */
600  if (pertrans->numInputs == 1)
601  {
602  Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
603 
604  pertrans->sortstates[aggstate->current_set] =
605  tuplesort_begin_datum(attr->atttypid,
606  pertrans->sortOperators[0],
607  pertrans->sortCollations[0],
608  pertrans->sortNullsFirst[0],
609  work_mem, NULL, TUPLESORT_NONE);
610  }
611  else
612  pertrans->sortstates[aggstate->current_set] =
613  tuplesort_begin_heap(pertrans->sortdesc,
614  pertrans->numSortCols,
615  pertrans->sortColIdx,
616  pertrans->sortOperators,
617  pertrans->sortCollations,
618  pertrans->sortNullsFirst,
619  work_mem, NULL, TUPLESORT_NONE);
620  }
621 
622  /*
623  * (Re)set transValue to the initial value.
624  *
625  * Note that when the initial value is pass-by-ref, we must copy it (into
626  * the aggcontext) since we will pfree the transValue later.
627  */
628  if (pertrans->initValueIsNull)
629  pergroupstate->transValue = pertrans->initValue;
630  else
631  {
632  MemoryContext oldContext;
633 
635  pergroupstate->transValue = datumCopy(pertrans->initValue,
636  pertrans->transtypeByVal,
637  pertrans->transtypeLen);
638  MemoryContextSwitchTo(oldContext);
639  }
640  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
641 
642  /*
643  * If the initial value for the transition state doesn't exist in the
644  * pg_aggregate table then we will let the first non-NULL value returned
645  * from the outer procNode become the initial value. (This is useful for
646  * aggregates like max() and min().) The noTransValue flag signals that we
647  * still need to do this.
648  */
649  pergroupstate->noTransValue = pertrans->initValueIsNull;
650 }
651 
652 /*
653  * Initialize all aggregate transition states for a new group of input values.
654  *
655  * If there are multiple grouping sets, we initialize only the first numReset
656  * of them (the grouping sets are ordered so that the most specific one, which
657  * is reset most often, is first). As a convenience, if numReset is 0, we
658  * reinitialize all sets.
659  *
660  * NB: This cannot be used for hash aggregates, as for those the grouping set
661  * number has to be specified from further up.
662  *
663  * When called, CurrentMemoryContext should be the per-query context.
664  */
665 static void
667  AggStatePerGroup *pergroups,
668  int numReset)
669 {
670  int transno;
671  int numGroupingSets = Max(aggstate->phase->numsets, 1);
672  int setno = 0;
673  int numTrans = aggstate->numtrans;
674  AggStatePerTrans transstates = aggstate->pertrans;
675 
676  if (numReset == 0)
677  numReset = numGroupingSets;
678 
679  for (setno = 0; setno < numReset; setno++)
680  {
681  AggStatePerGroup pergroup = pergroups[setno];
682 
683  select_current_set(aggstate, setno, false);
684 
685  for (transno = 0; transno < numTrans; transno++)
686  {
687  AggStatePerTrans pertrans = &transstates[transno];
688  AggStatePerGroup pergroupstate = &pergroup[transno];
689 
690  initialize_aggregate(aggstate, pertrans, pergroupstate);
691  }
692  }
693 }
694 
695 /*
696  * Given new input value(s), advance the transition function of one aggregate
697  * state within one grouping set only (already set in aggstate->current_set)
698  *
699  * The new values (and null flags) have been preloaded into argument positions
700  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
701  * pass to the transition function. We also expect that the static fields of
702  * the fcinfo are already initialized; that was done by ExecInitAgg().
703  *
704  * It doesn't matter which memory context this is called in.
705  */
706 static void
708  AggStatePerTrans pertrans,
709  AggStatePerGroup pergroupstate)
710 {
711  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
712  MemoryContext oldContext;
713  Datum newVal;
714 
715  if (pertrans->transfn.fn_strict)
716  {
717  /*
718  * For a strict transfn, nothing happens when there's a NULL input; we
719  * just keep the prior transValue.
720  */
721  int numTransInputs = pertrans->numTransInputs;
722  int i;
723 
724  for (i = 1; i <= numTransInputs; i++)
725  {
726  if (fcinfo->args[i].isnull)
727  return;
728  }
729  if (pergroupstate->noTransValue)
730  {
731  /*
732  * transValue has not been initialized. This is the first non-NULL
733  * input value. We use it as the initial value for transValue. (We
734  * already checked that the agg's input type is binary-compatible
735  * with its transtype, so straight copy here is OK.)
736  *
737  * We must copy the datum into aggcontext if it is pass-by-ref. We
738  * do not need to pfree the old transValue, since it's NULL.
739  */
741  pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
742  pertrans->transtypeByVal,
743  pertrans->transtypeLen);
744  pergroupstate->transValueIsNull = false;
745  pergroupstate->noTransValue = false;
746  MemoryContextSwitchTo(oldContext);
747  return;
748  }
749  if (pergroupstate->transValueIsNull)
750  {
751  /*
752  * Don't call a strict function with NULL inputs. Note it is
753  * possible to get here despite the above tests, if the transfn is
754  * strict *and* returned a NULL on a prior cycle. If that happens
755  * we will propagate the NULL all the way to the end.
756  */
757  return;
758  }
759  }
760 
761  /* We run the transition functions in per-input-tuple memory context */
762  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
763 
764  /* set up aggstate->curpertrans for AggGetAggref() */
765  aggstate->curpertrans = pertrans;
766 
767  /*
768  * OK to call the transition function
769  */
770  fcinfo->args[0].value = pergroupstate->transValue;
771  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
772  fcinfo->isnull = false; /* just in case transfn doesn't set it */
773 
774  newVal = FunctionCallInvoke(fcinfo);
775 
776  aggstate->curpertrans = NULL;
777 
778  /*
779  * If pass-by-ref datatype, must copy the new value into aggcontext and
780  * free the prior transValue. But if transfn returned a pointer to its
781  * first input, we don't need to do anything. Also, if transfn returned a
782  * pointer to a R/W expanded object that is already a child of the
783  * aggcontext, assume we can adopt that value without copying it.
784  *
785  * It's safe to compare newVal with pergroup->transValue without regard
786  * for either being NULL, because ExecAggTransReparent() takes care to set
787  * transValue to 0 when NULL. Otherwise we could end up accidentally not
788  * reparenting, when the transValue has the same numerical value as
789  * newValue, despite being NULL. This is a somewhat hot path, making it
790  * undesirable to instead solve this with another branch for the common
791  * case of the transition function returning its (modified) input
792  * argument.
793  */
794  if (!pertrans->transtypeByVal &&
795  DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
796  newVal = ExecAggTransReparent(aggstate, pertrans,
797  newVal, fcinfo->isnull,
798  pergroupstate->transValue,
799  pergroupstate->transValueIsNull);
800 
801  pergroupstate->transValue = newVal;
802  pergroupstate->transValueIsNull = fcinfo->isnull;
803 
804  MemoryContextSwitchTo(oldContext);
805 }
806 
807 /*
808  * Advance each aggregate transition state for one input tuple. The input
809  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
810  * accessible to ExecEvalExpr.
811  *
812  * We have two sets of transition states to handle: one for sorted aggregation
813  * and one for hashed; we do them both here, to avoid multiple evaluation of
814  * the inputs.
815  *
816  * When called, CurrentMemoryContext should be the per-query context.
817  */
818 static void
820 {
821  bool dummynull;
822 
824  aggstate->tmpcontext,
825  &dummynull);
826 }
827 
828 /*
829  * Run the transition function for a DISTINCT or ORDER BY aggregate
830  * with only one input. This is called after we have completed
831  * entering all the input values into the sort object. We complete the
832  * sort, read out the values in sorted order, and run the transition
833  * function on each value (applying DISTINCT if appropriate).
834  *
835  * Note that the strictness of the transition function was checked when
836  * entering the values into the sort, so we don't check it again here;
837  * we just apply standard SQL DISTINCT logic.
838  *
839  * The one-input case is handled separately from the multi-input case
840  * for performance reasons: for single by-value inputs, such as the
841  * common case of count(distinct id), the tuplesort_getdatum code path
842  * is around 300% faster. (The speedup for by-reference types is less
843  * but still noticeable.)
844  *
845  * This function handles only one grouping set (already set in
846  * aggstate->current_set).
847  *
848  * When called, CurrentMemoryContext should be the per-query context.
849  */
850 static void
852  AggStatePerTrans pertrans,
853  AggStatePerGroup pergroupstate)
854 {
855  Datum oldVal = (Datum) 0;
856  bool oldIsNull = true;
857  bool haveOldVal = false;
858  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
859  MemoryContext oldContext;
860  bool isDistinct = (pertrans->numDistinctCols > 0);
861  Datum newAbbrevVal = (Datum) 0;
862  Datum oldAbbrevVal = (Datum) 0;
863  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
864  Datum *newVal;
865  bool *isNull;
866 
867  Assert(pertrans->numDistinctCols < 2);
868 
869  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
870 
871  /* Load the column into argument 1 (arg 0 will be transition value) */
872  newVal = &fcinfo->args[1].value;
873  isNull = &fcinfo->args[1].isnull;
874 
875  /*
876  * Note: if input type is pass-by-ref, the datums returned by the sort are
877  * freshly palloc'd in the per-query context, so we must be careful to
878  * pfree them when they are no longer needed.
879  */
880 
881  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
882  true, newVal, isNull, &newAbbrevVal))
883  {
884  /*
885  * Clear and select the working context for evaluation of the equality
886  * function and transition function.
887  */
888  MemoryContextReset(workcontext);
889  oldContext = MemoryContextSwitchTo(workcontext);
890 
891  /*
892  * If DISTINCT mode, and not distinct from prior, skip it.
893  */
894  if (isDistinct &&
895  haveOldVal &&
896  ((oldIsNull && *isNull) ||
897  (!oldIsNull && !*isNull &&
898  oldAbbrevVal == newAbbrevVal &&
900  pertrans->aggCollation,
901  oldVal, *newVal)))))
902  {
903  /* equal to prior, so forget this one */
904  if (!pertrans->inputtypeByVal && !*isNull)
905  pfree(DatumGetPointer(*newVal));
906  }
907  else
908  {
909  advance_transition_function(aggstate, pertrans, pergroupstate);
910  /* forget the old value, if any */
911  if (!oldIsNull && !pertrans->inputtypeByVal)
912  pfree(DatumGetPointer(oldVal));
913  /* and remember the new one for subsequent equality checks */
914  oldVal = *newVal;
915  oldAbbrevVal = newAbbrevVal;
916  oldIsNull = *isNull;
917  haveOldVal = true;
918  }
919 
920  MemoryContextSwitchTo(oldContext);
921  }
922 
923  if (!oldIsNull && !pertrans->inputtypeByVal)
924  pfree(DatumGetPointer(oldVal));
925 
926  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
927  pertrans->sortstates[aggstate->current_set] = NULL;
928 }
929 
930 /*
931  * Run the transition function for a DISTINCT or ORDER BY aggregate
932  * with more than one input. This is called after we have completed
933  * entering all the input values into the sort object. We complete the
934  * sort, read out the values in sorted order, and run the transition
935  * function on each value (applying DISTINCT if appropriate).
936  *
937  * This function handles only one grouping set (already set in
938  * aggstate->current_set).
939  *
940  * When called, CurrentMemoryContext should be the per-query context.
941  */
942 static void
944  AggStatePerTrans pertrans,
945  AggStatePerGroup pergroupstate)
946 {
947  ExprContext *tmpcontext = aggstate->tmpcontext;
948  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
949  TupleTableSlot *slot1 = pertrans->sortslot;
950  TupleTableSlot *slot2 = pertrans->uniqslot;
951  int numTransInputs = pertrans->numTransInputs;
952  int numDistinctCols = pertrans->numDistinctCols;
953  Datum newAbbrevVal = (Datum) 0;
954  Datum oldAbbrevVal = (Datum) 0;
955  bool haveOldValue = false;
956  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
957  int i;
958 
959  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
960 
961  ExecClearTuple(slot1);
962  if (slot2)
963  ExecClearTuple(slot2);
964 
965  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
966  true, true, slot1, &newAbbrevVal))
967  {
969 
970  tmpcontext->ecxt_outertuple = slot1;
971  tmpcontext->ecxt_innertuple = slot2;
972 
973  if (numDistinctCols == 0 ||
974  !haveOldValue ||
975  newAbbrevVal != oldAbbrevVal ||
976  !ExecQual(pertrans->equalfnMulti, tmpcontext))
977  {
978  /*
979  * Extract the first numTransInputs columns as datums to pass to
980  * the transfn.
981  */
982  slot_getsomeattrs(slot1, numTransInputs);
983 
984  /* Load values into fcinfo */
985  /* Start from 1, since the 0th arg will be the transition value */
986  for (i = 0; i < numTransInputs; i++)
987  {
988  fcinfo->args[i + 1].value = slot1->tts_values[i];
989  fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
990  }
991 
992  advance_transition_function(aggstate, pertrans, pergroupstate);
993 
994  if (numDistinctCols > 0)
995  {
996  /* swap the slot pointers to retain the current tuple */
997  TupleTableSlot *tmpslot = slot2;
998 
999  slot2 = slot1;
1000  slot1 = tmpslot;
1001  /* avoid ExecQual() calls by reusing abbreviated keys */
1002  oldAbbrevVal = newAbbrevVal;
1003  haveOldValue = true;
1004  }
1005  }
1006 
1007  /* Reset context each time */
1008  ResetExprContext(tmpcontext);
1009 
1010  ExecClearTuple(slot1);
1011  }
1012 
1013  if (slot2)
1014  ExecClearTuple(slot2);
1015 
1016  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1017  pertrans->sortstates[aggstate->current_set] = NULL;
1018 
1019  /* restore previous slot, potentially in use for grouping sets */
1020  tmpcontext->ecxt_outertuple = save;
1021 }
1022 
1023 /*
1024  * Compute the final value of one aggregate.
1025  *
1026  * This function handles only one grouping set (already set in
1027  * aggstate->current_set).
1028  *
1029  * The finalfn will be run, and the result delivered, in the
1030  * output-tuple context; caller's CurrentMemoryContext does not matter.
1031  *
1032  * The finalfn uses the state as set in the transno. This also might be
1033  * being used by another aggregate function, so it's important that we do
1034  * nothing destructive here.
1035  */
1036 static void
1038  AggStatePerAgg peragg,
1039  AggStatePerGroup pergroupstate,
1040  Datum *resultVal, bool *resultIsNull)
1041 {
1042  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1043  bool anynull = false;
1044  MemoryContext oldContext;
1045  int i;
1046  ListCell *lc;
1047  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1048 
1050 
1051  /*
1052  * Evaluate any direct arguments. We do this even if there's no finalfn
1053  * (which is unlikely anyway), so that side-effects happen as expected.
1054  * The direct arguments go into arg positions 1 and up, leaving position 0
1055  * for the transition state value.
1056  */
1057  i = 1;
1058  foreach(lc, peragg->aggdirectargs)
1059  {
1060  ExprState *expr = (ExprState *) lfirst(lc);
1061 
1062  fcinfo->args[i].value = ExecEvalExpr(expr,
1063  aggstate->ss.ps.ps_ExprContext,
1064  &fcinfo->args[i].isnull);
1065  anynull |= fcinfo->args[i].isnull;
1066  i++;
1067  }
1068 
1069  /*
1070  * Apply the agg's finalfn if one is provided, else return transValue.
1071  */
1072  if (OidIsValid(peragg->finalfn_oid))
1073  {
1074  int numFinalArgs = peragg->numFinalArgs;
1075 
1076  /* set up aggstate->curperagg for AggGetAggref() */
1077  aggstate->curperagg = peragg;
1078 
1079  InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1080  numFinalArgs,
1081  pertrans->aggCollation,
1082  (void *) aggstate, NULL);
1083 
1084  /* Fill in the transition state value */
1085  fcinfo->args[0].value =
1086  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1087  pergroupstate->transValueIsNull,
1088  pertrans->transtypeLen);
1089  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1090  anynull |= pergroupstate->transValueIsNull;
1091 
1092  /* Fill any remaining argument positions with nulls */
1093  for (; i < numFinalArgs; i++)
1094  {
1095  fcinfo->args[i].value = (Datum) 0;
1096  fcinfo->args[i].isnull = true;
1097  anynull = true;
1098  }
1099 
1100  if (fcinfo->flinfo->fn_strict && anynull)
1101  {
1102  /* don't call a strict function with NULL inputs */
1103  *resultVal = (Datum) 0;
1104  *resultIsNull = true;
1105  }
1106  else
1107  {
1108  *resultVal = FunctionCallInvoke(fcinfo);
1109  *resultIsNull = fcinfo->isnull;
1110  }
1111  aggstate->curperagg = NULL;
1112  }
1113  else
1114  {
1115  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1116  *resultVal = pergroupstate->transValue;
1117  *resultIsNull = pergroupstate->transValueIsNull;
1118  }
1119 
1120  /*
1121  * If result is pass-by-ref, make sure it is in the right context.
1122  */
1123  if (!peragg->resulttypeByVal && !*resultIsNull &&
1125  DatumGetPointer(*resultVal)))
1126  *resultVal = datumCopy(*resultVal,
1127  peragg->resulttypeByVal,
1128  peragg->resulttypeLen);
1129 
1130  MemoryContextSwitchTo(oldContext);
1131 }
1132 
1133 /*
1134  * Compute the output value of one partial aggregate.
1135  *
1136  * The serialization function will be run, and the result delivered, in the
1137  * output-tuple context; caller's CurrentMemoryContext does not matter.
1138  */
1139 static void
1141  AggStatePerAgg peragg,
1142  AggStatePerGroup pergroupstate,
1143  Datum *resultVal, bool *resultIsNull)
1144 {
1145  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1146  MemoryContext oldContext;
1147 
1149 
1150  /*
1151  * serialfn_oid will be set if we must serialize the transvalue before
1152  * returning it
1153  */
1154  if (OidIsValid(pertrans->serialfn_oid))
1155  {
1156  /* Don't call a strict serialization function with NULL input. */
1157  if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1158  {
1159  *resultVal = (Datum) 0;
1160  *resultIsNull = true;
1161  }
1162  else
1163  {
1164  FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1165 
1166  fcinfo->args[0].value =
1167  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1168  pergroupstate->transValueIsNull,
1169  pertrans->transtypeLen);
1170  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1171  fcinfo->isnull = false;
1172 
1173  *resultVal = FunctionCallInvoke(fcinfo);
1174  *resultIsNull = fcinfo->isnull;
1175  }
1176  }
1177  else
1178  {
1179  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1180  *resultVal = pergroupstate->transValue;
1181  *resultIsNull = pergroupstate->transValueIsNull;
1182  }
1183 
1184  /* If result is pass-by-ref, make sure it is in the right context. */
1185  if (!peragg->resulttypeByVal && !*resultIsNull &&
1187  DatumGetPointer(*resultVal)))
1188  *resultVal = datumCopy(*resultVal,
1189  peragg->resulttypeByVal,
1190  peragg->resulttypeLen);
1191 
1192  MemoryContextSwitchTo(oldContext);
1193 }
1194 
1195 /*
1196  * Extract the attributes that make up the grouping key into the
1197  * hashslot. This is necessary to compute the hash or perform a lookup.
1198  */
1199 static inline void
1201  TupleTableSlot *inputslot,
1202  TupleTableSlot *hashslot)
1203 {
1204  int i;
1205 
1206  /* transfer just the needed columns into hashslot */
1207  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1208  ExecClearTuple(hashslot);
1209 
1210  for (i = 0; i < perhash->numhashGrpCols; i++)
1211  {
1212  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1213 
1214  hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1215  hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1216  }
1217  ExecStoreVirtualTuple(hashslot);
1218 }
1219 
1220 /*
1221  * Prepare to finalize and project based on the specified representative tuple
1222  * slot and grouping set.
1223  *
1224  * In the specified tuple slot, force to null all attributes that should be
1225  * read as null in the context of the current grouping set. Also stash the
1226  * current group bitmap where GroupingExpr can get at it.
1227  *
1228  * This relies on three conditions:
1229  *
1230  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1231  * only reference it in evaluations, which will only access individual
1232  * attributes.
1233  *
1234  * 2) No system columns are going to need to be nulled. (If a system column is
1235  * referenced in a group clause, it is actually projected in the outer plan
1236  * tlist.)
1237  *
1238  * 3) Within a given phase, we never need to recover the value of an attribute
1239  * once it has been set to null.
1240  *
1241  * Poking into the slot this way is a bit ugly, but the consensus is that the
1242  * alternative was worse.
1243  */
1244 static void
1245 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1246 {
1247  if (aggstate->phase->grouped_cols)
1248  {
1249  Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1250 
1251  aggstate->grouped_cols = grouped_cols;
1252 
1253  if (TTS_EMPTY(slot))
1254  {
1255  /*
1256  * Force all values to be NULL if working on an empty input tuple
1257  * (i.e. an empty grouping set for which no input rows were
1258  * supplied).
1259  */
1260  ExecStoreAllNullTuple(slot);
1261  }
1262  else if (aggstate->all_grouped_cols)
1263  {
1264  ListCell *lc;
1265 
1266  /* all_grouped_cols is arranged in desc order */
1268 
1269  foreach(lc, aggstate->all_grouped_cols)
1270  {
1271  int attnum = lfirst_int(lc);
1272 
1273  if (!bms_is_member(attnum, grouped_cols))
1274  slot->tts_isnull[attnum - 1] = true;
1275  }
1276  }
1277  }
1278 }
1279 
1280 /*
1281  * Compute the final value of all aggregates for one group.
1282  *
1283  * This function handles only one grouping set at a time, which the caller must
1284  * have selected. It's also the caller's responsibility to adjust the supplied
1285  * pergroup parameter to point to the current set's transvalues.
1286  *
1287  * Results are stored in the output econtext aggvalues/aggnulls.
1288  */
1289 static void
1291  AggStatePerAgg peraggs,
1292  AggStatePerGroup pergroup)
1293 {
1294  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1295  Datum *aggvalues = econtext->ecxt_aggvalues;
1296  bool *aggnulls = econtext->ecxt_aggnulls;
1297  int aggno;
1298 
1299  /*
1300  * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1301  * inputs and run the transition functions.
1302  */
1303  for (int transno = 0; transno < aggstate->numtrans; transno++)
1304  {
1305  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1306  AggStatePerGroup pergroupstate;
1307 
1308  pergroupstate = &pergroup[transno];
1309 
1310  if (pertrans->aggsortrequired)
1311  {
1312  Assert(aggstate->aggstrategy != AGG_HASHED &&
1313  aggstate->aggstrategy != AGG_MIXED);
1314 
1315  if (pertrans->numInputs == 1)
1317  pertrans,
1318  pergroupstate);
1319  else
1321  pertrans,
1322  pergroupstate);
1323  }
1324  else if (pertrans->numDistinctCols > 0 && pertrans->haslast)
1325  {
1326  pertrans->haslast = false;
1327 
1328  if (pertrans->numDistinctCols == 1)
1329  {
1330  if (!pertrans->inputtypeByVal && !pertrans->lastisnull)
1331  pfree(DatumGetPointer(pertrans->lastdatum));
1332 
1333  pertrans->lastisnull = false;
1334  pertrans->lastdatum = (Datum) 0;
1335  }
1336  else
1337  ExecClearTuple(pertrans->uniqslot);
1338  }
1339  }
1340 
1341  /*
1342  * Run the final functions.
1343  */
1344  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1345  {
1346  AggStatePerAgg peragg = &peraggs[aggno];
1347  int transno = peragg->transno;
1348  AggStatePerGroup pergroupstate;
1349 
1350  pergroupstate = &pergroup[transno];
1351 
1352  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1353  finalize_partialaggregate(aggstate, peragg, pergroupstate,
1354  &aggvalues[aggno], &aggnulls[aggno]);
1355  else
1356  finalize_aggregate(aggstate, peragg, pergroupstate,
1357  &aggvalues[aggno], &aggnulls[aggno]);
1358  }
1359 }
1360 
1361 /*
1362  * Project the result of a group (whose aggs have already been calculated by
1363  * finalize_aggregates). Returns the result slot, or NULL if no row is
1364  * projected (suppressed by qual).
1365  */
1366 static TupleTableSlot *
1368 {
1369  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1370 
1371  /*
1372  * Check the qual (HAVING clause); if the group does not match, ignore it.
1373  */
1374  if (ExecQual(aggstate->ss.ps.qual, econtext))
1375  {
1376  /*
1377  * Form and return projection tuple using the aggregate results and
1378  * the representative input tuple.
1379  */
1380  return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1381  }
1382  else
1383  InstrCountFiltered1(aggstate, 1);
1384 
1385  return NULL;
1386 }
1387 
1388 /*
1389  * Find input-tuple columns that are needed, dividing them into
1390  * aggregated and unaggregated sets.
1391  */
1392 static void
1393 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1394 {
1395  Agg *agg = (Agg *) aggstate->ss.ps.plan;
1396  FindColsContext context;
1397 
1398  context.is_aggref = false;
1399  context.aggregated = NULL;
1400  context.unaggregated = NULL;
1401 
1402  /* Examine tlist and quals */
1403  (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1404  (void) find_cols_walker((Node *) agg->plan.qual, &context);
1405 
1406  /* In some cases, grouping columns will not appear in the tlist */
1407  for (int i = 0; i < agg->numCols; i++)
1408  context.unaggregated = bms_add_member(context.unaggregated,
1409  agg->grpColIdx[i]);
1410 
1411  *aggregated = context.aggregated;
1412  *unaggregated = context.unaggregated;
1413 }
1414 
1415 static bool
1417 {
1418  if (node == NULL)
1419  return false;
1420  if (IsA(node, Var))
1421  {
1422  Var *var = (Var *) node;
1423 
1424  /* setrefs.c should have set the varno to OUTER_VAR */
1425  Assert(var->varno == OUTER_VAR);
1426  Assert(var->varlevelsup == 0);
1427  if (context->is_aggref)
1428  context->aggregated = bms_add_member(context->aggregated,
1429  var->varattno);
1430  else
1431  context->unaggregated = bms_add_member(context->unaggregated,
1432  var->varattno);
1433  return false;
1434  }
1435  if (IsA(node, Aggref))
1436  {
1437  Assert(!context->is_aggref);
1438  context->is_aggref = true;
1439  expression_tree_walker(node, find_cols_walker, (void *) context);
1440  context->is_aggref = false;
1441  return false;
1442  }
1444  (void *) context);
1445 }
1446 
1447 /*
1448  * (Re-)initialize the hash table(s) to empty.
1449  *
1450  * To implement hashed aggregation, we need a hashtable that stores a
1451  * representative tuple and an array of AggStatePerGroup structs for each
1452  * distinct set of GROUP BY column values. We compute the hash key from the
1453  * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1454  * for each entry.
1455  *
1456  * We have a separate hashtable and associated perhash data structure for each
1457  * grouping set for which we're doing hashing.
1458  *
1459  * The contents of the hash tables always live in the hashcontext's per-tuple
1460  * memory context (there is only one of these for all tables together, since
1461  * they are all reset at the same time).
1462  */
1463 static void
1465 {
1466  int setno;
1467 
1468  for (setno = 0; setno < aggstate->num_hashes; ++setno)
1469  {
1470  AggStatePerHash perhash = &aggstate->perhash[setno];
1471  long nbuckets;
1472  Size memory;
1473 
1474  if (perhash->hashtable != NULL)
1475  {
1476  ResetTupleHashTable(perhash->hashtable);
1477  continue;
1478  }
1479 
1480  Assert(perhash->aggnode->numGroups > 0);
1481 
1482  memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1483 
1484  /* choose reasonable number of buckets per hashtable */
1485  nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1486  perhash->aggnode->numGroups,
1487  memory);
1488 
1489  build_hash_table(aggstate, setno, nbuckets);
1490  }
1491 
1492  aggstate->hash_ngroups_current = 0;
1493 }
1494 
1495 /*
1496  * Build a single hashtable for this grouping set.
1497  */
1498 static void
1499 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1500 {
1501  AggStatePerHash perhash = &aggstate->perhash[setno];
1502  MemoryContext metacxt = aggstate->hash_metacxt;
1503  MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1504  MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1505  Size additionalsize;
1506 
1507  Assert(aggstate->aggstrategy == AGG_HASHED ||
1508  aggstate->aggstrategy == AGG_MIXED);
1509 
1510  /*
1511  * Used to make sure initial hash table allocation does not exceed
1512  * hash_mem. Note that the estimate does not include space for
1513  * pass-by-reference transition data values, nor for the representative
1514  * tuple of each group.
1515  */
1516  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1517 
1518  perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1519  perhash->hashslot->tts_tupleDescriptor,
1520  perhash->numCols,
1521  perhash->hashGrpColIdxHash,
1522  perhash->eqfuncoids,
1523  perhash->hashfunctions,
1524  perhash->aggnode->grpCollations,
1525  nbuckets,
1526  additionalsize,
1527  metacxt,
1528  hashcxt,
1529  tmpcxt,
1530  DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1531 }
1532 
1533 /*
1534  * Compute columns that actually need to be stored in hashtable entries. The
1535  * incoming tuples from the child plan node will contain grouping columns,
1536  * other columns referenced in our targetlist and qual, columns used to
1537  * compute the aggregate functions, and perhaps just junk columns we don't use
1538  * at all. Only columns of the first two types need to be stored in the
1539  * hashtable, and getting rid of the others can make the table entries
1540  * significantly smaller. The hashtable only contains the relevant columns,
1541  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1542  * into the format of the normal input descriptor.
1543  *
1544  * Additional columns, in addition to the columns grouped by, come from two
1545  * sources: Firstly functionally dependent columns that we don't need to group
1546  * by themselves, and secondly ctids for row-marks.
1547  *
1548  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1549  * then build an array of the columns included in the hashtable. We might
1550  * still have duplicates if the passed-in grpColIdx has them, which can happen
1551  * in edge cases from semijoins/distinct; these can't always be removed,
1552  * because it's not certain that the duplicate cols will be using the same
1553  * hash function.
1554  *
1555  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1556  * the per-query context (unlike the hash table itself).
1557  */
1558 static void
1560 {
1561  Bitmapset *base_colnos;
1562  Bitmapset *aggregated_colnos;
1563  TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1564  List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1565  int numHashes = aggstate->num_hashes;
1566  EState *estate = aggstate->ss.ps.state;
1567  int j;
1568 
1569  /* Find Vars that will be needed in tlist and qual */
1570  find_cols(aggstate, &aggregated_colnos, &base_colnos);
1571  aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1572  aggstate->max_colno_needed = 0;
1573  aggstate->all_cols_needed = true;
1574 
1575  for (int i = 0; i < scanDesc->natts; i++)
1576  {
1577  int colno = i + 1;
1578 
1579  if (bms_is_member(colno, aggstate->colnos_needed))
1580  aggstate->max_colno_needed = colno;
1581  else
1582  aggstate->all_cols_needed = false;
1583  }
1584 
1585  for (j = 0; j < numHashes; ++j)
1586  {
1587  AggStatePerHash perhash = &aggstate->perhash[j];
1588  Bitmapset *colnos = bms_copy(base_colnos);
1589  AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1590  List *hashTlist = NIL;
1591  TupleDesc hashDesc;
1592  int maxCols;
1593  int i;
1594 
1595  perhash->largestGrpColIdx = 0;
1596 
1597  /*
1598  * If we're doing grouping sets, then some Vars might be referenced in
1599  * tlist/qual for the benefit of other grouping sets, but not needed
1600  * when hashing; i.e. prepare_projection_slot will null them out, so
1601  * there'd be no point storing them. Use prepare_projection_slot's
1602  * logic to determine which.
1603  */
1604  if (aggstate->phases[0].grouped_cols)
1605  {
1606  Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1607  ListCell *lc;
1608 
1609  foreach(lc, aggstate->all_grouped_cols)
1610  {
1611  int attnum = lfirst_int(lc);
1612 
1613  if (!bms_is_member(attnum, grouped_cols))
1614  colnos = bms_del_member(colnos, attnum);
1615  }
1616  }
1617 
1618  /*
1619  * Compute maximum number of input columns accounting for possible
1620  * duplications in the grpColIdx array, which can happen in some edge
1621  * cases where HashAggregate was generated as part of a semijoin or a
1622  * DISTINCT.
1623  */
1624  maxCols = bms_num_members(colnos) + perhash->numCols;
1625 
1626  perhash->hashGrpColIdxInput =
1627  palloc(maxCols * sizeof(AttrNumber));
1628  perhash->hashGrpColIdxHash =
1629  palloc(perhash->numCols * sizeof(AttrNumber));
1630 
1631  /* Add all the grouping columns to colnos */
1632  for (i = 0; i < perhash->numCols; i++)
1633  colnos = bms_add_member(colnos, grpColIdx[i]);
1634 
1635  /*
1636  * First build mapping for columns directly hashed. These are the
1637  * first, because they'll be accessed when computing hash values and
1638  * comparing tuples for exact matches. We also build simple mapping
1639  * for execGrouping, so it knows where to find the to-be-hashed /
1640  * compared columns in the input.
1641  */
1642  for (i = 0; i < perhash->numCols; i++)
1643  {
1644  perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1645  perhash->hashGrpColIdxHash[i] = i + 1;
1646  perhash->numhashGrpCols++;
1647  /* delete already mapped columns */
1648  bms_del_member(colnos, grpColIdx[i]);
1649  }
1650 
1651  /* and add the remaining columns */
1652  while ((i = bms_first_member(colnos)) >= 0)
1653  {
1654  perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1655  perhash->numhashGrpCols++;
1656  }
1657 
1658  /* and build a tuple descriptor for the hashtable */
1659  for (i = 0; i < perhash->numhashGrpCols; i++)
1660  {
1661  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1662 
1663  hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1664  perhash->largestGrpColIdx =
1665  Max(varNumber + 1, perhash->largestGrpColIdx);
1666  }
1667 
1668  hashDesc = ExecTypeFromTL(hashTlist);
1669 
1670  execTuplesHashPrepare(perhash->numCols,
1671  perhash->aggnode->grpOperators,
1672  &perhash->eqfuncoids,
1673  &perhash->hashfunctions);
1674  perhash->hashslot =
1675  ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1677 
1678  list_free(hashTlist);
1679  bms_free(colnos);
1680  }
1681 
1682  bms_free(base_colnos);
1683 }
1684 
1685 /*
1686  * Estimate per-hash-table-entry overhead.
1687  */
1688 Size
1689 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1690 {
1691  Size tupleChunkSize;
1692  Size pergroupChunkSize;
1693  Size transitionChunkSize;
1694  Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1695  tupleWidth);
1696  Size pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1697 
1698  tupleChunkSize = CHUNKHDRSZ + tupleSize;
1699 
1700  if (pergroupSize > 0)
1701  pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1702  else
1703  pergroupChunkSize = 0;
1704 
1705  if (transitionSpace > 0)
1706  transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1707  else
1708  transitionChunkSize = 0;
1709 
1710  return
1711  sizeof(TupleHashEntryData) +
1712  tupleChunkSize +
1713  pergroupChunkSize +
1714  transitionChunkSize;
1715 }
1716 
1717 /*
1718  * hashagg_recompile_expressions()
1719  *
1720  * Identifies the right phase, compiles the right expression given the
1721  * arguments, and then sets phase->evalfunc to that expression.
1722  *
1723  * Different versions of the compiled expression are needed depending on
1724  * whether hash aggregation has spilled or not, and whether it's reading from
1725  * the outer plan or a tape. Before spilling to disk, the expression reads
1726  * from the outer plan and does not need to perform a NULL check. After
1727  * HashAgg begins to spill, new groups will not be created in the hash table,
1728  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1729  * pointer check to the expression. Then, when reading spilled data from a
1730  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1731  *
1732  * It would be wasteful to recompile every time, so cache the compiled
1733  * expressions in the AggStatePerPhase, and reuse when appropriate.
1734  */
1735 static void
1736 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1737 {
1738  AggStatePerPhase phase;
1739  int i = minslot ? 1 : 0;
1740  int j = nullcheck ? 1 : 0;
1741 
1742  Assert(aggstate->aggstrategy == AGG_HASHED ||
1743  aggstate->aggstrategy == AGG_MIXED);
1744 
1745  if (aggstate->aggstrategy == AGG_HASHED)
1746  phase = &aggstate->phases[0];
1747  else /* AGG_MIXED */
1748  phase = &aggstate->phases[1];
1749 
1750  if (phase->evaltrans_cache[i][j] == NULL)
1751  {
1752  const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1753  bool outerfixed = aggstate->ss.ps.outeropsfixed;
1754  bool dohash = true;
1755  bool dosort = false;
1756 
1757  /*
1758  * If minslot is true, that means we are processing a spilled batch
1759  * (inside agg_refill_hash_table()), and we must not advance the
1760  * sorted grouping sets.
1761  */
1762  if (aggstate->aggstrategy == AGG_MIXED && !minslot)
1763  dosort = true;
1764 
1765  /* temporarily change the outerops while compiling the expression */
1766  if (minslot)
1767  {
1768  aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1769  aggstate->ss.ps.outeropsfixed = true;
1770  }
1771 
1772  phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1773  dosort, dohash,
1774  nullcheck);
1775 
1776  /* change back */
1777  aggstate->ss.ps.outerops = outerops;
1778  aggstate->ss.ps.outeropsfixed = outerfixed;
1779  }
1780 
1781  phase->evaltrans = phase->evaltrans_cache[i][j];
1782 }
1783 
1784 /*
1785  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1786  * number of partitions we expect to create (if we do spill).
1787  *
1788  * There are two limits: a memory limit, and also an ngroups limit. The
1789  * ngroups limit becomes important when we expect transition values to grow
1790  * substantially larger than the initial value.
1791  */
1792 void
1793 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1794  Size *mem_limit, uint64 *ngroups_limit,
1795  int *num_partitions)
1796 {
1797  int npartitions;
1798  Size partition_mem;
1799  Size hash_mem_limit = get_hash_memory_limit();
1800 
1801  /* if not expected to spill, use all of hash_mem */
1802  if (input_groups * hashentrysize <= hash_mem_limit)
1803  {
1804  if (num_partitions != NULL)
1805  *num_partitions = 0;
1806  *mem_limit = hash_mem_limit;
1807  *ngroups_limit = hash_mem_limit / hashentrysize;
1808  return;
1809  }
1810 
1811  /*
1812  * Calculate expected memory requirements for spilling, which is the size
1813  * of the buffers needed for all the tapes that need to be open at once.
1814  * Then, subtract that from the memory available for holding hash tables.
1815  */
1816  npartitions = hash_choose_num_partitions(input_groups,
1817  hashentrysize,
1818  used_bits,
1819  NULL);
1820  if (num_partitions != NULL)
1821  *num_partitions = npartitions;
1822 
1823  partition_mem =
1825  HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1826 
1827  /*
1828  * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1829  * minimum number of partitions, so we aren't going to dramatically exceed
1830  * work mem anyway.
1831  */
1832  if (hash_mem_limit > 4 * partition_mem)
1833  *mem_limit = hash_mem_limit - partition_mem;
1834  else
1835  *mem_limit = hash_mem_limit * 0.75;
1836 
1837  if (*mem_limit > hashentrysize)
1838  *ngroups_limit = *mem_limit / hashentrysize;
1839  else
1840  *ngroups_limit = 1;
1841 }
1842 
1843 /*
1844  * hash_agg_check_limits
1845  *
1846  * After adding a new group to the hash table, check whether we need to enter
1847  * spill mode. Allocations may happen without adding new groups (for instance,
1848  * if the transition state size grows), so this check is imperfect.
1849  */
1850 static void
1852 {
1853  uint64 ngroups = aggstate->hash_ngroups_current;
1854  Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1855  true);
1857  true);
1858 
1859  /*
1860  * Don't spill unless there's at least one group in the hash table so we
1861  * can be sure to make progress even in edge cases.
1862  */
1863  if (aggstate->hash_ngroups_current > 0 &&
1864  (meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
1865  ngroups > aggstate->hash_ngroups_limit))
1866  {
1867  hash_agg_enter_spill_mode(aggstate);
1868  }
1869 }
1870 
1871 /*
1872  * Enter "spill mode", meaning that no new groups are added to any of the hash
1873  * tables. Tuples that would create a new group are instead spilled, and
1874  * processed later.
1875  */
1876 static void
1878 {
1879  aggstate->hash_spill_mode = true;
1880  hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1881 
1882  if (!aggstate->hash_ever_spilled)
1883  {
1884  Assert(aggstate->hash_tapeset == NULL);
1885  Assert(aggstate->hash_spills == NULL);
1886 
1887  aggstate->hash_ever_spilled = true;
1888 
1889  aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
1890 
1891  aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1892 
1893  for (int setno = 0; setno < aggstate->num_hashes; setno++)
1894  {
1895  AggStatePerHash perhash = &aggstate->perhash[setno];
1896  HashAggSpill *spill = &aggstate->hash_spills[setno];
1897 
1898  hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
1899  perhash->aggnode->numGroups,
1900  aggstate->hashentrysize);
1901  }
1902  }
1903 }
1904 
1905 /*
1906  * Update metrics after filling the hash table.
1907  *
1908  * If reading from the outer plan, from_tape should be false; if reading from
1909  * another tape, from_tape should be true.
1910  */
1911 static void
1912 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1913 {
1914  Size meta_mem;
1915  Size hashkey_mem;
1916  Size buffer_mem;
1917  Size total_mem;
1918 
1919  if (aggstate->aggstrategy != AGG_MIXED &&
1920  aggstate->aggstrategy != AGG_HASHED)
1921  return;
1922 
1923  /* memory for the hash table itself */
1924  meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1925 
1926  /* memory for the group keys and transition states */
1927  hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1928 
1929  /* memory for read/write tape buffers, if spilled */
1930  buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1931  if (from_tape)
1932  buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1933 
1934  /* update peak mem */
1935  total_mem = meta_mem + hashkey_mem + buffer_mem;
1936  if (total_mem > aggstate->hash_mem_peak)
1937  aggstate->hash_mem_peak = total_mem;
1938 
1939  /* update disk usage */
1940  if (aggstate->hash_tapeset != NULL)
1941  {
1942  uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
1943 
1944  if (aggstate->hash_disk_used < disk_used)
1945  aggstate->hash_disk_used = disk_used;
1946  }
1947 
1948  /* update hashentrysize estimate based on contents */
1949  if (aggstate->hash_ngroups_current > 0)
1950  {
1951  aggstate->hashentrysize =
1952  sizeof(TupleHashEntryData) +
1953  (hashkey_mem / (double) aggstate->hash_ngroups_current);
1954  }
1955 }
1956 
1957 /*
1958  * Choose a reasonable number of buckets for the initial hash table size.
1959  */
1960 static long
1961 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1962 {
1963  long max_nbuckets;
1964  long nbuckets = ngroups;
1965 
1966  max_nbuckets = memory / hashentrysize;
1967 
1968  /*
1969  * Underestimating is better than overestimating. Too many buckets crowd
1970  * out space for group keys and transition state values.
1971  */
1972  max_nbuckets >>= 1;
1973 
1974  if (nbuckets > max_nbuckets)
1975  nbuckets = max_nbuckets;
1976 
1977  return Max(nbuckets, 1);
1978 }
1979 
1980 /*
1981  * Determine the number of partitions to create when spilling, which will
1982  * always be a power of two. If log2_npartitions is non-NULL, set
1983  * *log2_npartitions to the log2() of the number of partitions.
1984  */
1985 static int
1986 hash_choose_num_partitions(double input_groups, double hashentrysize,
1987  int used_bits, int *log2_npartitions)
1988 {
1989  Size hash_mem_limit = get_hash_memory_limit();
1990  double partition_limit;
1991  double mem_wanted;
1992  double dpartitions;
1993  int npartitions;
1994  int partition_bits;
1995 
1996  /*
1997  * Avoid creating so many partitions that the memory requirements of the
1998  * open partition files are greater than 1/4 of hash_mem.
1999  */
2000  partition_limit =
2001  (hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
2003 
2004  mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
2005 
2006  /* make enough partitions so that each one is likely to fit in memory */
2007  dpartitions = 1 + (mem_wanted / hash_mem_limit);
2008 
2009  if (dpartitions > partition_limit)
2010  dpartitions = partition_limit;
2011 
2012  if (dpartitions < HASHAGG_MIN_PARTITIONS)
2013  dpartitions = HASHAGG_MIN_PARTITIONS;
2014  if (dpartitions > HASHAGG_MAX_PARTITIONS)
2015  dpartitions = HASHAGG_MAX_PARTITIONS;
2016 
2017  /* HASHAGG_MAX_PARTITIONS limit makes this safe */
2018  npartitions = (int) dpartitions;
2019 
2020  /* ceil(log2(npartitions)) */
2021  partition_bits = my_log2(npartitions);
2022 
2023  /* make sure that we don't exhaust the hash bits */
2024  if (partition_bits + used_bits >= 32)
2025  partition_bits = 32 - used_bits;
2026 
2027  if (log2_npartitions != NULL)
2028  *log2_npartitions = partition_bits;
2029 
2030  /* number of partitions will be a power of two */
2031  npartitions = 1 << partition_bits;
2032 
2033  return npartitions;
2034 }
2035 
2036 /*
2037  * Initialize a freshly-created TupleHashEntry.
2038  */
2039 static void
2041  TupleHashEntry entry)
2042 {
2043  AggStatePerGroup pergroup;
2044  int transno;
2045 
2046  aggstate->hash_ngroups_current++;
2047  hash_agg_check_limits(aggstate);
2048 
2049  /* no need to allocate or initialize per-group state */
2050  if (aggstate->numtrans == 0)
2051  return;
2052 
2053  pergroup = (AggStatePerGroup)
2054  MemoryContextAlloc(hashtable->tablecxt,
2055  sizeof(AggStatePerGroupData) * aggstate->numtrans);
2056 
2057  entry->additional = pergroup;
2058 
2059  /*
2060  * Initialize aggregates for new tuple group, lookup_hash_entries()
2061  * already has selected the relevant grouping set.
2062  */
2063  for (transno = 0; transno < aggstate->numtrans; transno++)
2064  {
2065  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2066  AggStatePerGroup pergroupstate = &pergroup[transno];
2067 
2068  initialize_aggregate(aggstate, pertrans, pergroupstate);
2069  }
2070 }
2071 
2072 /*
2073  * Look up hash entries for the current tuple in all hashed grouping sets.
2074  *
2075  * Be aware that lookup_hash_entry can reset the tmpcontext.
2076  *
2077  * Some entries may be left NULL if we are in "spill mode". The same tuple
2078  * will belong to different groups for each grouping set, so may match a group
2079  * already in memory for one set and match a group not in memory for another
2080  * set. When in "spill mode", the tuple will be spilled for each grouping set
2081  * where it doesn't match a group in memory.
2082  *
2083  * NB: It's possible to spill the same tuple for several different grouping
2084  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2085  * the tuple multiple times for multiple grouping sets, it can be partitioned
2086  * for each grouping set, making the refilling of the hash table very
2087  * efficient.
2088  */
2089 static void
2091 {
2092  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2093  TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2094  int setno;
2095 
2096  for (setno = 0; setno < aggstate->num_hashes; setno++)
2097  {
2098  AggStatePerHash perhash = &aggstate->perhash[setno];
2099  TupleHashTable hashtable = perhash->hashtable;
2100  TupleTableSlot *hashslot = perhash->hashslot;
2101  TupleHashEntry entry;
2102  uint32 hash;
2103  bool isnew = false;
2104  bool *p_isnew;
2105 
2106  /* if hash table already spilled, don't create new entries */
2107  p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2108 
2109  select_current_set(aggstate, setno, true);
2110  prepare_hash_slot(perhash,
2111  outerslot,
2112  hashslot);
2113 
2114  entry = LookupTupleHashEntry(hashtable, hashslot,
2115  p_isnew, &hash);
2116 
2117  if (entry != NULL)
2118  {
2119  if (isnew)
2120  initialize_hash_entry(aggstate, hashtable, entry);
2121  pergroup[setno] = entry->additional;
2122  }
2123  else
2124  {
2125  HashAggSpill *spill = &aggstate->hash_spills[setno];
2126  TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2127 
2128  if (spill->partitions == NULL)
2129  hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
2130  perhash->aggnode->numGroups,
2131  aggstate->hashentrysize);
2132 
2133  hashagg_spill_tuple(aggstate, spill, slot, hash);
2134  pergroup[setno] = NULL;
2135  }
2136  }
2137 }
2138 
2139 /*
2140  * ExecAgg -
2141  *
2142  * ExecAgg receives tuples from its outer subplan and aggregates over
2143  * the appropriate attribute for each aggregate function use (Aggref
2144  * node) appearing in the targetlist or qual of the node. The number
2145  * of tuples to aggregate over depends on whether grouped or plain
2146  * aggregation is selected. In grouped aggregation, we produce a result
2147  * row for each group; in plain aggregation there's a single result row
2148  * for the whole query. In either case, the value of each aggregate is
2149  * stored in the expression context to be used when ExecProject evaluates
2150  * the result tuple.
2151  */
2152 static TupleTableSlot *
2154 {
2155  AggState *node = castNode(AggState, pstate);
2156  TupleTableSlot *result = NULL;
2157 
2159 
2160  if (!node->agg_done)
2161  {
2162  /* Dispatch based on strategy */
2163  switch (node->phase->aggstrategy)
2164  {
2165  case AGG_HASHED:
2166  if (!node->table_filled)
2167  agg_fill_hash_table(node);
2168  /* FALLTHROUGH */
2169  case AGG_MIXED:
2170  result = agg_retrieve_hash_table(node);
2171  break;
2172  case AGG_PLAIN:
2173  case AGG_SORTED:
2174  result = agg_retrieve_direct(node);
2175  break;
2176  }
2177 
2178  if (!TupIsNull(result))
2179  return result;
2180  }
2181 
2182  return NULL;
2183 }
2184 
2185 /*
2186  * ExecAgg for non-hashed case
2187  */
2188 static TupleTableSlot *
2190 {
2191  Agg *node = aggstate->phase->aggnode;
2192  ExprContext *econtext;
2193  ExprContext *tmpcontext;
2194  AggStatePerAgg peragg;
2195  AggStatePerGroup *pergroups;
2196  TupleTableSlot *outerslot;
2197  TupleTableSlot *firstSlot;
2198  TupleTableSlot *result;
2199  bool hasGroupingSets = aggstate->phase->numsets > 0;
2200  int numGroupingSets = Max(aggstate->phase->numsets, 1);
2201  int currentSet;
2202  int nextSetSize;
2203  int numReset;
2204  int i;
2205 
2206  /*
2207  * get state info from node
2208  *
2209  * econtext is the per-output-tuple expression context
2210  *
2211  * tmpcontext is the per-input-tuple expression context
2212  */
2213  econtext = aggstate->ss.ps.ps_ExprContext;
2214  tmpcontext = aggstate->tmpcontext;
2215 
2216  peragg = aggstate->peragg;
2217  pergroups = aggstate->pergroups;
2218  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2219 
2220  /*
2221  * We loop retrieving groups until we find one matching
2222  * aggstate->ss.ps.qual
2223  *
2224  * For grouping sets, we have the invariant that aggstate->projected_set
2225  * is either -1 (initial call) or the index (starting from 0) in
2226  * gset_lengths for the group we just completed (either by projecting a
2227  * row or by discarding it in the qual).
2228  */
2229  while (!aggstate->agg_done)
2230  {
2231  /*
2232  * Clear the per-output-tuple context for each group, as well as
2233  * aggcontext (which contains any pass-by-ref transvalues of the old
2234  * group). Some aggregate functions store working state in child
2235  * contexts; those now get reset automatically without us needing to
2236  * do anything special.
2237  *
2238  * We use ReScanExprContext not just ResetExprContext because we want
2239  * any registered shutdown callbacks to be called. That allows
2240  * aggregate functions to ensure they've cleaned up any non-memory
2241  * resources.
2242  */
2243  ReScanExprContext(econtext);
2244 
2245  /*
2246  * Determine how many grouping sets need to be reset at this boundary.
2247  */
2248  if (aggstate->projected_set >= 0 &&
2249  aggstate->projected_set < numGroupingSets)
2250  numReset = aggstate->projected_set + 1;
2251  else
2252  numReset = numGroupingSets;
2253 
2254  /*
2255  * numReset can change on a phase boundary, but that's OK; we want to
2256  * reset the contexts used in _this_ phase, and later, after possibly
2257  * changing phase, initialize the right number of aggregates for the
2258  * _new_ phase.
2259  */
2260 
2261  for (i = 0; i < numReset; i++)
2262  {
2263  ReScanExprContext(aggstate->aggcontexts[i]);
2264  }
2265 
2266  /*
2267  * Check if input is complete and there are no more groups to project
2268  * in this phase; move to next phase or mark as done.
2269  */
2270  if (aggstate->input_done == true &&
2271  aggstate->projected_set >= (numGroupingSets - 1))
2272  {
2273  if (aggstate->current_phase < aggstate->numphases - 1)
2274  {
2275  initialize_phase(aggstate, aggstate->current_phase + 1);
2276  aggstate->input_done = false;
2277  aggstate->projected_set = -1;
2278  numGroupingSets = Max(aggstate->phase->numsets, 1);
2279  node = aggstate->phase->aggnode;
2280  numReset = numGroupingSets;
2281  }
2282  else if (aggstate->aggstrategy == AGG_MIXED)
2283  {
2284  /*
2285  * Mixed mode; we've output all the grouped stuff and have
2286  * full hashtables, so switch to outputting those.
2287  */
2288  initialize_phase(aggstate, 0);
2289  aggstate->table_filled = true;
2291  &aggstate->perhash[0].hashiter);
2292  select_current_set(aggstate, 0, true);
2293  return agg_retrieve_hash_table(aggstate);
2294  }
2295  else
2296  {
2297  aggstate->agg_done = true;
2298  break;
2299  }
2300  }
2301 
2302  /*
2303  * Get the number of columns in the next grouping set after the last
2304  * projected one (if any). This is the number of columns to compare to
2305  * see if we reached the boundary of that set too.
2306  */
2307  if (aggstate->projected_set >= 0 &&
2308  aggstate->projected_set < (numGroupingSets - 1))
2309  nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2310  else
2311  nextSetSize = 0;
2312 
2313  /*----------
2314  * If a subgroup for the current grouping set is present, project it.
2315  *
2316  * We have a new group if:
2317  * - we're out of input but haven't projected all grouping sets
2318  * (checked above)
2319  * OR
2320  * - we already projected a row that wasn't from the last grouping
2321  * set
2322  * AND
2323  * - the next grouping set has at least one grouping column (since
2324  * empty grouping sets project only once input is exhausted)
2325  * AND
2326  * - the previous and pending rows differ on the grouping columns
2327  * of the next grouping set
2328  *----------
2329  */
2330  tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2331  if (aggstate->input_done ||
2332  (node->aggstrategy != AGG_PLAIN &&
2333  aggstate->projected_set != -1 &&
2334  aggstate->projected_set < (numGroupingSets - 1) &&
2335  nextSetSize > 0 &&
2336  !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2337  tmpcontext)))
2338  {
2339  aggstate->projected_set += 1;
2340 
2341  Assert(aggstate->projected_set < numGroupingSets);
2342  Assert(nextSetSize > 0 || aggstate->input_done);
2343  }
2344  else
2345  {
2346  /*
2347  * We no longer care what group we just projected, the next
2348  * projection will always be the first (or only) grouping set
2349  * (unless the input proves to be empty).
2350  */
2351  aggstate->projected_set = 0;
2352 
2353  /*
2354  * If we don't already have the first tuple of the new group,
2355  * fetch it from the outer plan.
2356  */
2357  if (aggstate->grp_firstTuple == NULL)
2358  {
2359  outerslot = fetch_input_tuple(aggstate);
2360  if (!TupIsNull(outerslot))
2361  {
2362  /*
2363  * Make a copy of the first input tuple; we will use this
2364  * for comparisons (in group mode) and for projection.
2365  */
2366  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2367  }
2368  else
2369  {
2370  /* outer plan produced no tuples at all */
2371  if (hasGroupingSets)
2372  {
2373  /*
2374  * If there was no input at all, we need to project
2375  * rows only if there are grouping sets of size 0.
2376  * Note that this implies that there can't be any
2377  * references to ungrouped Vars, which would otherwise
2378  * cause issues with the empty output slot.
2379  *
2380  * XXX: This is no longer true, we currently deal with
2381  * this in finalize_aggregates().
2382  */
2383  aggstate->input_done = true;
2384 
2385  while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2386  {
2387  aggstate->projected_set += 1;
2388  if (aggstate->projected_set >= numGroupingSets)
2389  {
2390  /*
2391  * We can't set agg_done here because we might
2392  * have more phases to do, even though the
2393  * input is empty. So we need to restart the
2394  * whole outer loop.
2395  */
2396  break;
2397  }
2398  }
2399 
2400  if (aggstate->projected_set >= numGroupingSets)
2401  continue;
2402  }
2403  else
2404  {
2405  aggstate->agg_done = true;
2406  /* If we are grouping, we should produce no tuples too */
2407  if (node->aggstrategy != AGG_PLAIN)
2408  return NULL;
2409  }
2410  }
2411  }
2412 
2413  /*
2414  * Initialize working state for a new input tuple group.
2415  */
2416  initialize_aggregates(aggstate, pergroups, numReset);
2417 
2418  if (aggstate->grp_firstTuple != NULL)
2419  {
2420  /*
2421  * Store the copied first input tuple in the tuple table slot
2422  * reserved for it. The tuple will be deleted when it is
2423  * cleared from the slot.
2424  */
2426  firstSlot, true);
2427  aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2428 
2429  /* set up for first advance_aggregates call */
2430  tmpcontext->ecxt_outertuple = firstSlot;
2431 
2432  /*
2433  * Process each outer-plan tuple, and then fetch the next one,
2434  * until we exhaust the outer plan or cross a group boundary.
2435  */
2436  for (;;)
2437  {
2438  /*
2439  * During phase 1 only of a mixed agg, we need to update
2440  * hashtables as well in advance_aggregates.
2441  */
2442  if (aggstate->aggstrategy == AGG_MIXED &&
2443  aggstate->current_phase == 1)
2444  {
2445  lookup_hash_entries(aggstate);
2446  }
2447 
2448  /* Advance the aggregates (or combine functions) */
2449  advance_aggregates(aggstate);
2450 
2451  /* Reset per-input-tuple context after each tuple */
2452  ResetExprContext(tmpcontext);
2453 
2454  outerslot = fetch_input_tuple(aggstate);
2455  if (TupIsNull(outerslot))
2456  {
2457  /* no more outer-plan tuples available */
2458 
2459  /* if we built hash tables, finalize any spills */
2460  if (aggstate->aggstrategy == AGG_MIXED &&
2461  aggstate->current_phase == 1)
2463 
2464  if (hasGroupingSets)
2465  {
2466  aggstate->input_done = true;
2467  break;
2468  }
2469  else
2470  {
2471  aggstate->agg_done = true;
2472  break;
2473  }
2474  }
2475  /* set up for next advance_aggregates call */
2476  tmpcontext->ecxt_outertuple = outerslot;
2477 
2478  /*
2479  * If we are grouping, check whether we've crossed a group
2480  * boundary.
2481  */
2482  if (node->aggstrategy != AGG_PLAIN)
2483  {
2484  tmpcontext->ecxt_innertuple = firstSlot;
2485  if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2486  tmpcontext))
2487  {
2488  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2489  break;
2490  }
2491  }
2492  }
2493  }
2494 
2495  /*
2496  * Use the representative input tuple for any references to
2497  * non-aggregated input columns in aggregate direct args, the node
2498  * qual, and the tlist. (If we are not grouping, and there are no
2499  * input rows at all, we will come here with an empty firstSlot
2500  * ... but if not grouping, there can't be any references to
2501  * non-aggregated input columns, so no problem.)
2502  */
2503  econtext->ecxt_outertuple = firstSlot;
2504  }
2505 
2506  Assert(aggstate->projected_set >= 0);
2507 
2508  currentSet = aggstate->projected_set;
2509 
2510  prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2511 
2512  select_current_set(aggstate, currentSet, false);
2513 
2514  finalize_aggregates(aggstate,
2515  peragg,
2516  pergroups[currentSet]);
2517 
2518  /*
2519  * If there's no row to project right now, we must continue rather
2520  * than returning a null since there might be more groups.
2521  */
2522  result = project_aggregates(aggstate);
2523  if (result)
2524  return result;
2525  }
2526 
2527  /* No more groups */
2528  return NULL;
2529 }
2530 
2531 /*
2532  * ExecAgg for hashed case: read input and build hash table
2533  */
2534 static void
2536 {
2537  TupleTableSlot *outerslot;
2538  ExprContext *tmpcontext = aggstate->tmpcontext;
2539 
2540  /*
2541  * Process each outer-plan tuple, and then fetch the next one, until we
2542  * exhaust the outer plan.
2543  */
2544  for (;;)
2545  {
2546  outerslot = fetch_input_tuple(aggstate);
2547  if (TupIsNull(outerslot))
2548  break;
2549 
2550  /* set up for lookup_hash_entries and advance_aggregates */
2551  tmpcontext->ecxt_outertuple = outerslot;
2552 
2553  /* Find or build hashtable entries */
2554  lookup_hash_entries(aggstate);
2555 
2556  /* Advance the aggregates (or combine functions) */
2557  advance_aggregates(aggstate);
2558 
2559  /*
2560  * Reset per-input-tuple context after each tuple, but note that the
2561  * hash lookups do this too
2562  */
2563  ResetExprContext(aggstate->tmpcontext);
2564  }
2565 
2566  /* finalize spills, if any */
2568 
2569  aggstate->table_filled = true;
2570  /* Initialize to walk the first hash table */
2571  select_current_set(aggstate, 0, true);
2573  &aggstate->perhash[0].hashiter);
2574 }
2575 
2576 /*
2577  * If any data was spilled during hash aggregation, reset the hash table and
2578  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2579  * table will again contain data, ready to be consumed by
2580  * agg_retrieve_hash_table_in_memory().
2581  *
2582  * Should only be called after all in memory hash table entries have been
2583  * finalized and emitted.
2584  *
2585  * Return false when input is exhausted and there's no more work to be done;
2586  * otherwise return true.
2587  */
2588 static bool
2590 {
2591  HashAggBatch *batch;
2592  AggStatePerHash perhash;
2593  HashAggSpill spill;
2594  LogicalTapeSet *tapeset = aggstate->hash_tapeset;
2595  bool spill_initialized = false;
2596 
2597  if (aggstate->hash_batches == NIL)
2598  return false;
2599 
2600  /* hash_batches is a stack, with the top item at the end of the list */
2601  batch = llast(aggstate->hash_batches);
2602  aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
2603 
2604  hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2605  batch->used_bits, &aggstate->hash_mem_limit,
2606  &aggstate->hash_ngroups_limit, NULL);
2607 
2608  /*
2609  * Each batch only processes one grouping set; set the rest to NULL so
2610  * that advance_aggregates() knows to ignore them. We don't touch
2611  * pergroups for sorted grouping sets here, because they will be needed if
2612  * we rescan later. The expressions for sorted grouping sets will not be
2613  * evaluated after we recompile anyway.
2614  */
2615  MemSet(aggstate->hash_pergroup, 0,
2616  sizeof(AggStatePerGroup) * aggstate->num_hashes);
2617 
2618  /* free memory and reset hash tables */
2619  ReScanExprContext(aggstate->hashcontext);
2620  for (int setno = 0; setno < aggstate->num_hashes; setno++)
2621  ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2622 
2623  aggstate->hash_ngroups_current = 0;
2624 
2625  /*
2626  * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2627  * happens in phase 0. So, we switch to phase 1 when processing a batch,
2628  * and back to phase 0 after the batch is done.
2629  */
2630  Assert(aggstate->current_phase == 0);
2631  if (aggstate->phase->aggstrategy == AGG_MIXED)
2632  {
2633  aggstate->current_phase = 1;
2634  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2635  }
2636 
2637  select_current_set(aggstate, batch->setno, true);
2638 
2639  perhash = &aggstate->perhash[aggstate->current_set];
2640 
2641  /*
2642  * Spilled tuples are always read back as MinimalTuples, which may be
2643  * different from the outer plan, so recompile the aggregate expressions.
2644  *
2645  * We still need the NULL check, because we are only processing one
2646  * grouping set at a time and the rest will be NULL.
2647  */
2648  hashagg_recompile_expressions(aggstate, true, true);
2649 
2650  for (;;)
2651  {
2652  TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2653  TupleTableSlot *hashslot = perhash->hashslot;
2654  TupleHashEntry entry;
2655  MinimalTuple tuple;
2656  uint32 hash;
2657  bool isnew = false;
2658  bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2659 
2661 
2662  tuple = hashagg_batch_read(batch, &hash);
2663  if (tuple == NULL)
2664  break;
2665 
2666  ExecStoreMinimalTuple(tuple, spillslot, true);
2667  aggstate->tmpcontext->ecxt_outertuple = spillslot;
2668 
2669  prepare_hash_slot(perhash,
2670  aggstate->tmpcontext->ecxt_outertuple,
2671  hashslot);
2672  entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot,
2673  p_isnew, hash);
2674 
2675  if (entry != NULL)
2676  {
2677  if (isnew)
2678  initialize_hash_entry(aggstate, perhash->hashtable, entry);
2679  aggstate->hash_pergroup[batch->setno] = entry->additional;
2680  advance_aggregates(aggstate);
2681  }
2682  else
2683  {
2684  if (!spill_initialized)
2685  {
2686  /*
2687  * Avoid initializing the spill until we actually need it so
2688  * that we don't assign tapes that will never be used.
2689  */
2690  spill_initialized = true;
2691  hashagg_spill_init(&spill, tapeset, batch->used_bits,
2692  batch->input_card, aggstate->hashentrysize);
2693  }
2694  /* no memory for a new group, spill */
2695  hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
2696 
2697  aggstate->hash_pergroup[batch->setno] = NULL;
2698  }
2699 
2700  /*
2701  * Reset per-input-tuple context after each tuple, but note that the
2702  * hash lookups do this too
2703  */
2704  ResetExprContext(aggstate->tmpcontext);
2705  }
2706 
2707  LogicalTapeClose(batch->input_tape);
2708 
2709  /* change back to phase 0 */
2710  aggstate->current_phase = 0;
2711  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2712 
2713  if (spill_initialized)
2714  {
2715  hashagg_spill_finish(aggstate, &spill, batch->setno);
2716  hash_agg_update_metrics(aggstate, true, spill.npartitions);
2717  }
2718  else
2719  hash_agg_update_metrics(aggstate, true, 0);
2720 
2721  aggstate->hash_spill_mode = false;
2722 
2723  /* prepare to walk the first hash table */
2724  select_current_set(aggstate, batch->setno, true);
2725  ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2726  &aggstate->perhash[batch->setno].hashiter);
2727 
2728  pfree(batch);
2729 
2730  return true;
2731 }
2732 
2733 /*
2734  * ExecAgg for hashed case: retrieving groups from hash table
2735  *
2736  * After exhausting in-memory tuples, also try refilling the hash table using
2737  * previously-spilled tuples. Only returns NULL after all in-memory and
2738  * spilled tuples are exhausted.
2739  */
2740 static TupleTableSlot *
2742 {
2743  TupleTableSlot *result = NULL;
2744 
2745  while (result == NULL)
2746  {
2747  result = agg_retrieve_hash_table_in_memory(aggstate);
2748  if (result == NULL)
2749  {
2750  if (!agg_refill_hash_table(aggstate))
2751  {
2752  aggstate->agg_done = true;
2753  break;
2754  }
2755  }
2756  }
2757 
2758  return result;
2759 }
2760 
2761 /*
2762  * Retrieve the groups from the in-memory hash tables without considering any
2763  * spilled tuples.
2764  */
2765 static TupleTableSlot *
2767 {
2768  ExprContext *econtext;
2769  AggStatePerAgg peragg;
2770  AggStatePerGroup pergroup;
2771  TupleHashEntryData *entry;
2772  TupleTableSlot *firstSlot;
2773  TupleTableSlot *result;
2774  AggStatePerHash perhash;
2775 
2776  /*
2777  * get state info from node.
2778  *
2779  * econtext is the per-output-tuple expression context.
2780  */
2781  econtext = aggstate->ss.ps.ps_ExprContext;
2782  peragg = aggstate->peragg;
2783  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2784 
2785  /*
2786  * Note that perhash (and therefore anything accessed through it) can
2787  * change inside the loop, as we change between grouping sets.
2788  */
2789  perhash = &aggstate->perhash[aggstate->current_set];
2790 
2791  /*
2792  * We loop retrieving groups until we find one satisfying
2793  * aggstate->ss.ps.qual
2794  */
2795  for (;;)
2796  {
2797  TupleTableSlot *hashslot = perhash->hashslot;
2798  int i;
2799 
2801 
2802  /*
2803  * Find the next entry in the hash table
2804  */
2805  entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2806  if (entry == NULL)
2807  {
2808  int nextset = aggstate->current_set + 1;
2809 
2810  if (nextset < aggstate->num_hashes)
2811  {
2812  /*
2813  * Switch to next grouping set, reinitialize, and restart the
2814  * loop.
2815  */
2816  select_current_set(aggstate, nextset, true);
2817 
2818  perhash = &aggstate->perhash[aggstate->current_set];
2819 
2820  ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2821 
2822  continue;
2823  }
2824  else
2825  {
2826  return NULL;
2827  }
2828  }
2829 
2830  /*
2831  * Clear the per-output-tuple context for each group
2832  *
2833  * We intentionally don't use ReScanExprContext here; if any aggs have
2834  * registered shutdown callbacks, they mustn't be called yet, since we
2835  * might not be done with that agg.
2836  */
2837  ResetExprContext(econtext);
2838 
2839  /*
2840  * Transform representative tuple back into one with the right
2841  * columns.
2842  */
2843  ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2844  slot_getallattrs(hashslot);
2845 
2846  ExecClearTuple(firstSlot);
2847  memset(firstSlot->tts_isnull, true,
2848  firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2849 
2850  for (i = 0; i < perhash->numhashGrpCols; i++)
2851  {
2852  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2853 
2854  firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2855  firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2856  }
2857  ExecStoreVirtualTuple(firstSlot);
2858 
2859  pergroup = (AggStatePerGroup) entry->additional;
2860 
2861  /*
2862  * Use the representative input tuple for any references to
2863  * non-aggregated input columns in the qual and tlist.
2864  */
2865  econtext->ecxt_outertuple = firstSlot;
2866 
2867  prepare_projection_slot(aggstate,
2868  econtext->ecxt_outertuple,
2869  aggstate->current_set);
2870 
2871  finalize_aggregates(aggstate, peragg, pergroup);
2872 
2873  result = project_aggregates(aggstate);
2874  if (result)
2875  return result;
2876  }
2877 
2878  /* No more groups */
2879  return NULL;
2880 }
2881 
2882 /*
2883  * hashagg_spill_init
2884  *
2885  * Called after we determined that spilling is necessary. Chooses the number
2886  * of partitions to create, and initializes them.
2887  */
2888 static void
2889 hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
2890  double input_groups, double hashentrysize)
2891 {
2892  int npartitions;
2893  int partition_bits;
2894 
2895  npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2896  used_bits, &partition_bits);
2897 
2898  spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
2899  spill->ntuples = palloc0(sizeof(int64) * npartitions);
2900  spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
2901 
2902  for (int i = 0; i < npartitions; i++)
2903  spill->partitions[i] = LogicalTapeCreate(tapeset);
2904 
2905  spill->shift = 32 - used_bits - partition_bits;
2906  spill->mask = (npartitions - 1) << spill->shift;
2907  spill->npartitions = npartitions;
2908 
2909  for (int i = 0; i < npartitions; i++)
2911 }
2912 
2913 /*
2914  * hashagg_spill_tuple
2915  *
2916  * No room for new groups in the hash table. Save for later in the appropriate
2917  * partition.
2918  */
2919 static Size
2921  TupleTableSlot *inputslot, uint32 hash)
2922 {
2923  TupleTableSlot *spillslot;
2924  int partition;
2925  MinimalTuple tuple;
2926  LogicalTape *tape;
2927  int total_written = 0;
2928  bool shouldFree;
2929 
2930  Assert(spill->partitions != NULL);
2931 
2932  /* spill only attributes that we actually need */
2933  if (!aggstate->all_cols_needed)
2934  {
2935  spillslot = aggstate->hash_spill_wslot;
2936  slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
2937  ExecClearTuple(spillslot);
2938  for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
2939  {
2940  if (bms_is_member(i + 1, aggstate->colnos_needed))
2941  {
2942  spillslot->tts_values[i] = inputslot->tts_values[i];
2943  spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
2944  }
2945  else
2946  spillslot->tts_isnull[i] = true;
2947  }
2948  ExecStoreVirtualTuple(spillslot);
2949  }
2950  else
2951  spillslot = inputslot;
2952 
2953  tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
2954 
2955  partition = (hash & spill->mask) >> spill->shift;
2956  spill->ntuples[partition]++;
2957 
2958  /*
2959  * All hash values destined for a given partition have some bits in
2960  * common, which causes bad HLL cardinality estimates. Hash the hash to
2961  * get a more uniform distribution.
2962  */
2963  addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
2964 
2965  tape = spill->partitions[partition];
2966 
2967  LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
2968  total_written += sizeof(uint32);
2969 
2970  LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
2971  total_written += tuple->t_len;
2972 
2973  if (shouldFree)
2974  pfree(tuple);
2975 
2976  return total_written;
2977 }
2978 
2979 /*
2980  * hashagg_batch_new
2981  *
2982  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
2983  * be done.
2984  */
2985 static HashAggBatch *
2986 hashagg_batch_new(LogicalTape *input_tape, int setno,
2987  int64 input_tuples, double input_card, int used_bits)
2988 {
2989  HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
2990 
2991  batch->setno = setno;
2992  batch->used_bits = used_bits;
2993  batch->input_tape = input_tape;
2994  batch->input_tuples = input_tuples;
2995  batch->input_card = input_card;
2996 
2997  return batch;
2998 }
2999 
3000 /*
3001  * read_spilled_tuple
3002  * read the next tuple from a batch's tape. Return NULL if no more.
3003  */
3004 static MinimalTuple
3006 {
3007  LogicalTape *tape = batch->input_tape;
3008  MinimalTuple tuple;
3009  uint32 t_len;
3010  size_t nread;
3011  uint32 hash;
3012 
3013  nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
3014  if (nread == 0)
3015  return NULL;
3016  if (nread != sizeof(uint32))
3017  ereport(ERROR,
3019  errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3020  tape, sizeof(uint32), nread)));
3021  if (hashp != NULL)
3022  *hashp = hash;
3023 
3024  nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
3025  if (nread != sizeof(uint32))
3026  ereport(ERROR,
3028  errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3029  tape, sizeof(uint32), nread)));
3030 
3031  tuple = (MinimalTuple) palloc(t_len);
3032  tuple->t_len = t_len;
3033 
3034  nread = LogicalTapeRead(tape,
3035  (void *) ((char *) tuple + sizeof(uint32)),
3036  t_len - sizeof(uint32));
3037  if (nread != t_len - sizeof(uint32))
3038  ereport(ERROR,
3040  errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
3041  tape, t_len - sizeof(uint32), nread)));
3042 
3043  return tuple;
3044 }
3045 
3046 /*
3047  * hashagg_finish_initial_spills
3048  *
3049  * After a HashAggBatch has been processed, it may have spilled tuples to
3050  * disk. If so, turn the spilled partitions into new batches that must later
3051  * be executed.
3052  */
3053 static void
3055 {
3056  int setno;
3057  int total_npartitions = 0;
3058 
3059  if (aggstate->hash_spills != NULL)
3060  {
3061  for (setno = 0; setno < aggstate->num_hashes; setno++)
3062  {
3063  HashAggSpill *spill = &aggstate->hash_spills[setno];
3064 
3065  total_npartitions += spill->npartitions;
3066  hashagg_spill_finish(aggstate, spill, setno);
3067  }
3068 
3069  /*
3070  * We're not processing tuples from outer plan any more; only
3071  * processing batches of spilled tuples. The initial spill structures
3072  * are no longer needed.
3073  */
3074  pfree(aggstate->hash_spills);
3075  aggstate->hash_spills = NULL;
3076  }
3077 
3078  hash_agg_update_metrics(aggstate, false, total_npartitions);
3079  aggstate->hash_spill_mode = false;
3080 }
3081 
3082 /*
3083  * hashagg_spill_finish
3084  *
3085  * Transform spill partitions into new batches.
3086  */
3087 static void
3088 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3089 {
3090  int i;
3091  int used_bits = 32 - spill->shift;
3092 
3093  if (spill->npartitions == 0)
3094  return; /* didn't spill */
3095 
3096  for (i = 0; i < spill->npartitions; i++)
3097  {
3098  LogicalTape *tape = spill->partitions[i];
3099  HashAggBatch *new_batch;
3100  double cardinality;
3101 
3102  /* if the partition is empty, don't create a new batch of work */
3103  if (spill->ntuples[i] == 0)
3104  continue;
3105 
3106  cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3107  freeHyperLogLog(&spill->hll_card[i]);
3108 
3109  /* rewinding frees the buffer while not in use */
3111 
3112  new_batch = hashagg_batch_new(tape, setno,
3113  spill->ntuples[i], cardinality,
3114  used_bits);
3115  aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
3116  aggstate->hash_batches_used++;
3117  }
3118 
3119  pfree(spill->ntuples);
3120  pfree(spill->hll_card);
3121  pfree(spill->partitions);
3122 }
3123 
3124 /*
3125  * Free resources related to a spilled HashAgg.
3126  */
3127 static void
3129 {
3130  /* free spills from initial pass */
3131  if (aggstate->hash_spills != NULL)
3132  {
3133  int setno;
3134 
3135  for (setno = 0; setno < aggstate->num_hashes; setno++)
3136  {
3137  HashAggSpill *spill = &aggstate->hash_spills[setno];
3138 
3139  pfree(spill->ntuples);
3140  pfree(spill->partitions);
3141  }
3142  pfree(aggstate->hash_spills);
3143  aggstate->hash_spills = NULL;
3144  }
3145 
3146  /* free batches */
3147  list_free_deep(aggstate->hash_batches);
3148  aggstate->hash_batches = NIL;
3149 
3150  /* close tape set */
3151  if (aggstate->hash_tapeset != NULL)
3152  {
3153  LogicalTapeSetClose(aggstate->hash_tapeset);
3154  aggstate->hash_tapeset = NULL;
3155  }
3156 }
3157 
3158 
3159 /* -----------------
3160  * ExecInitAgg
3161  *
3162  * Creates the run-time information for the agg node produced by the
3163  * planner and initializes its outer subtree.
3164  *
3165  * -----------------
3166  */
3167 AggState *
3168 ExecInitAgg(Agg *node, EState *estate, int eflags)
3169 {
3170  AggState *aggstate;
3171  AggStatePerAgg peraggs;
3172  AggStatePerTrans pertransstates;
3173  AggStatePerGroup *pergroups;
3174  Plan *outerPlan;
3175  ExprContext *econtext;
3176  TupleDesc scanDesc;
3177  int max_aggno;
3178  int max_transno;
3179  int numaggrefs;
3180  int numaggs;
3181  int numtrans;
3182  int phase;
3183  int phaseidx;
3184  ListCell *l;
3185  Bitmapset *all_grouped_cols = NULL;
3186  int numGroupingSets = 1;
3187  int numPhases;
3188  int numHashes;
3189  int i = 0;
3190  int j = 0;
3191  bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3192  node->aggstrategy == AGG_MIXED);
3193 
3194  /* check for unsupported flags */
3195  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3196 
3197  /*
3198  * create state structure
3199  */
3200  aggstate = makeNode(AggState);
3201  aggstate->ss.ps.plan = (Plan *) node;
3202  aggstate->ss.ps.state = estate;
3203  aggstate->ss.ps.ExecProcNode = ExecAgg;
3204 
3205  aggstate->aggs = NIL;
3206  aggstate->numaggs = 0;
3207  aggstate->numtrans = 0;
3208  aggstate->aggstrategy = node->aggstrategy;
3209  aggstate->aggsplit = node->aggsplit;
3210  aggstate->maxsets = 0;
3211  aggstate->projected_set = -1;
3212  aggstate->current_set = 0;
3213  aggstate->peragg = NULL;
3214  aggstate->pertrans = NULL;
3215  aggstate->curperagg = NULL;
3216  aggstate->curpertrans = NULL;
3217  aggstate->input_done = false;
3218  aggstate->agg_done = false;
3219  aggstate->pergroups = NULL;
3220  aggstate->grp_firstTuple = NULL;
3221  aggstate->sort_in = NULL;
3222  aggstate->sort_out = NULL;
3223 
3224  /*
3225  * phases[0] always exists, but is dummy in sorted/plain mode
3226  */
3227  numPhases = (use_hashing ? 1 : 2);
3228  numHashes = (use_hashing ? 1 : 0);
3229 
3230  /*
3231  * Calculate the maximum number of grouping sets in any phase; this
3232  * determines the size of some allocations. Also calculate the number of
3233  * phases, since all hashed/mixed nodes contribute to only a single phase.
3234  */
3235  if (node->groupingSets)
3236  {
3237  numGroupingSets = list_length(node->groupingSets);
3238 
3239  foreach(l, node->chain)
3240  {
3241  Agg *agg = lfirst(l);
3242 
3243  numGroupingSets = Max(numGroupingSets,
3244  list_length(agg->groupingSets));
3245 
3246  /*
3247  * additional AGG_HASHED aggs become part of phase 0, but all
3248  * others add an extra phase.
3249  */
3250  if (agg->aggstrategy != AGG_HASHED)
3251  ++numPhases;
3252  else
3253  ++numHashes;
3254  }
3255  }
3256 
3257  aggstate->maxsets = numGroupingSets;
3258  aggstate->numphases = numPhases;
3259 
3260  aggstate->aggcontexts = (ExprContext **)
3261  palloc0(sizeof(ExprContext *) * numGroupingSets);
3262 
3263  /*
3264  * Create expression contexts. We need three or more, one for
3265  * per-input-tuple processing, one for per-output-tuple processing, one
3266  * for all the hashtables, and one for each grouping set. The per-tuple
3267  * memory context of the per-grouping-set ExprContexts (aggcontexts)
3268  * replaces the standalone memory context formerly used to hold transition
3269  * values. We cheat a little by using ExecAssignExprContext() to build
3270  * all of them.
3271  *
3272  * NOTE: the details of what is stored in aggcontexts and what is stored
3273  * in the regular per-query memory context are driven by a simple
3274  * decision: we want to reset the aggcontext at group boundaries (if not
3275  * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3276  */
3277  ExecAssignExprContext(estate, &aggstate->ss.ps);
3278  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3279 
3280  for (i = 0; i < numGroupingSets; ++i)
3281  {
3282  ExecAssignExprContext(estate, &aggstate->ss.ps);
3283  aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3284  }
3285 
3286  if (use_hashing)
3287  aggstate->hashcontext = CreateWorkExprContext(estate);
3288 
3289  ExecAssignExprContext(estate, &aggstate->ss.ps);
3290 
3291  /*
3292  * Initialize child nodes.
3293  *
3294  * If we are doing a hashed aggregation then the child plan does not need
3295  * to handle REWIND efficiently; see ExecReScanAgg.
3296  */
3297  if (node->aggstrategy == AGG_HASHED)
3298  eflags &= ~EXEC_FLAG_REWIND;
3299  outerPlan = outerPlan(node);
3300  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3301 
3302  /*
3303  * initialize source tuple type.
3304  */
3305  aggstate->ss.ps.outerops =
3307  &aggstate->ss.ps.outeropsfixed);
3308  aggstate->ss.ps.outeropsset = true;
3309 
3310  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3311  aggstate->ss.ps.outerops);
3312  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3313 
3314  /*
3315  * If there are more than two phases (including a potential dummy phase
3316  * 0), input will be resorted using tuplesort. Need a slot for that.
3317  */
3318  if (numPhases > 2)
3319  {
3320  aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3322 
3323  /*
3324  * The output of the tuplesort, and the output from the outer child
3325  * might not use the same type of slot. In most cases the child will
3326  * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3327  * input can also be presorted due an index, in which case it could be
3328  * a different type of slot.
3329  *
3330  * XXX: For efficiency it would be good to instead/additionally
3331  * generate expressions with corresponding settings of outerops* for
3332  * the individual phases - deforming is often a bottleneck for
3333  * aggregations with lots of rows per group. If there's multiple
3334  * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3335  * the nodeAgg.c internal tuplesort).
3336  */
3337  if (aggstate->ss.ps.outeropsfixed &&
3338  aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3339  aggstate->ss.ps.outeropsfixed = false;
3340  }
3341 
3342  /*
3343  * Initialize result type, slot and projection.
3344  */
3346  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3347 
3348  /*
3349  * initialize child expressions
3350  *
3351  * We expect the parser to have checked that no aggs contain other agg
3352  * calls in their arguments (and just to be sure, we verify it again while
3353  * initializing the plan node). This would make no sense under SQL
3354  * semantics, and it's forbidden by the spec. Because it is true, we
3355  * don't need to worry about evaluating the aggs in any particular order.
3356  *
3357  * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
3358  * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
3359  * during ExecAssignProjectionInfo, above.
3360  */
3361  aggstate->ss.ps.qual =
3362  ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3363 
3364  /*
3365  * We should now have found all Aggrefs in the targetlist and quals.
3366  */
3367  numaggrefs = list_length(aggstate->aggs);
3368  max_aggno = -1;
3369  max_transno = -1;
3370  foreach(l, aggstate->aggs)
3371  {
3372  Aggref *aggref = (Aggref *) lfirst(l);
3373 
3374  max_aggno = Max(max_aggno, aggref->aggno);
3375  max_transno = Max(max_transno, aggref->aggtransno);
3376  }
3377  numaggs = max_aggno + 1;
3378  numtrans = max_transno + 1;
3379 
3380  /*
3381  * For each phase, prepare grouping set data and fmgr lookup data for
3382  * compare functions. Accumulate all_grouped_cols in passing.
3383  */
3384  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3385 
3386  aggstate->num_hashes = numHashes;
3387  if (numHashes)
3388  {
3389  aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3390  aggstate->phases[0].numsets = 0;
3391  aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3392  aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3393  }
3394 
3395  phase = 0;
3396  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3397  {
3398  Agg *aggnode;
3399  Sort *sortnode;
3400 
3401  if (phaseidx > 0)
3402  {
3403  aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3404  sortnode = castNode(Sort, outerPlan(aggnode));
3405  }
3406  else
3407  {
3408  aggnode = node;
3409  sortnode = NULL;
3410  }
3411 
3412  Assert(phase <= 1 || sortnode);
3413 
3414  if (aggnode->aggstrategy == AGG_HASHED
3415  || aggnode->aggstrategy == AGG_MIXED)
3416  {
3417  AggStatePerPhase phasedata = &aggstate->phases[0];
3418  AggStatePerHash perhash;
3419  Bitmapset *cols = NULL;
3420 
3421  Assert(phase == 0);
3422  i = phasedata->numsets++;
3423  perhash = &aggstate->perhash[i];
3424 
3425  /* phase 0 always points to the "real" Agg in the hash case */
3426  phasedata->aggnode = node;
3427  phasedata->aggstrategy = node->aggstrategy;
3428 
3429  /* but the actual Agg node representing this hash is saved here */
3430  perhash->aggnode = aggnode;
3431 
3432  phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3433 
3434  for (j = 0; j < aggnode->numCols; ++j)
3435  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3436 
3437  phasedata->grouped_cols[i] = cols;
3438 
3439  all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3440  continue;
3441  }
3442  else
3443  {
3444  AggStatePerPhase phasedata = &aggstate->phases[++phase];
3445  int num_sets;
3446 
3447  phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3448 
3449  if (num_sets)
3450  {
3451  phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3452  phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3453 
3454  i = 0;
3455  foreach(l, aggnode->groupingSets)
3456  {
3457  int current_length = list_length(lfirst(l));
3458  Bitmapset *cols = NULL;
3459 
3460  /* planner forces this to be correct */
3461  for (j = 0; j < current_length; ++j)
3462  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3463 
3464  phasedata->grouped_cols[i] = cols;
3465  phasedata->gset_lengths[i] = current_length;
3466 
3467  ++i;
3468  }
3469 
3470  all_grouped_cols = bms_add_members(all_grouped_cols,
3471  phasedata->grouped_cols[0]);
3472  }
3473  else
3474  {
3475  Assert(phaseidx == 0);
3476 
3477  phasedata->gset_lengths = NULL;
3478  phasedata->grouped_cols = NULL;
3479  }
3480 
3481  /*
3482  * If we are grouping, precompute fmgr lookup data for inner loop.
3483  */
3484  if (aggnode->aggstrategy == AGG_SORTED)
3485  {
3486  int i = 0;
3487 
3488  Assert(aggnode->numCols > 0);
3489 
3490  /*
3491  * Build a separate function for each subset of columns that
3492  * need to be compared.
3493  */
3494  phasedata->eqfunctions =
3495  (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3496 
3497  /* for each grouping set */
3498  for (i = 0; i < phasedata->numsets; i++)
3499  {
3500  int length = phasedata->gset_lengths[i];
3501 
3502  if (phasedata->eqfunctions[length - 1] != NULL)
3503  continue;
3504 
3505  phasedata->eqfunctions[length - 1] =
3506  execTuplesMatchPrepare(scanDesc,
3507  length,
3508  aggnode->grpColIdx,
3509  aggnode->grpOperators,
3510  aggnode->grpCollations,
3511  (PlanState *) aggstate);
3512  }
3513 
3514  /* and for all grouped columns, unless already computed */
3515  if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3516  {
3517  phasedata->eqfunctions[aggnode->numCols - 1] =
3518  execTuplesMatchPrepare(scanDesc,
3519  aggnode->numCols,
3520  aggnode->grpColIdx,
3521  aggnode->grpOperators,
3522  aggnode->grpCollations,
3523  (PlanState *) aggstate);
3524  }
3525  }
3526 
3527  phasedata->aggnode = aggnode;
3528  phasedata->aggstrategy = aggnode->aggstrategy;
3529  phasedata->sortnode = sortnode;
3530  }
3531  }
3532 
3533  /*
3534  * Convert all_grouped_cols to a descending-order list.
3535  */
3536  i = -1;
3537  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3538  aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3539 
3540  /*
3541  * Set up aggregate-result storage in the output expr context, and also
3542  * allocate my private per-agg working storage
3543  */
3544  econtext = aggstate->ss.ps.ps_ExprContext;
3545  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3546  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3547 
3548  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3549  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
3550 
3551  aggstate->peragg = peraggs;
3552  aggstate->pertrans = pertransstates;
3553 
3554 
3555  aggstate->all_pergroups =
3557  * (numGroupingSets + numHashes));
3558  pergroups = aggstate->all_pergroups;
3559 
3560  if (node->aggstrategy != AGG_HASHED)
3561  {
3562  for (i = 0; i < numGroupingSets; i++)
3563  {
3564  pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3565  * numaggs);
3566  }
3567 
3568  aggstate->pergroups = pergroups;
3569  pergroups += numGroupingSets;
3570  }
3571 
3572  /*
3573  * Hashing can only appear in the initial phase.
3574  */
3575  if (use_hashing)
3576  {
3577  Plan *outerplan = outerPlan(node);
3578  uint64 totalGroups = 0;
3579  int i;
3580 
3581  aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3582  "HashAgg meta context",
3584  aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3586  aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3587  &TTSOpsVirtual);
3588 
3589  /* this is an array of pointers, not structures */
3590  aggstate->hash_pergroup = pergroups;
3591 
3592  aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3593  outerplan->plan_width,
3594  node->transitionSpace);
3595 
3596  /*
3597  * Consider all of the grouping sets together when setting the limits
3598  * and estimating the number of partitions. This can be inaccurate
3599  * when there is more than one grouping set, but should still be
3600  * reasonable.
3601  */
3602  for (i = 0; i < aggstate->num_hashes; i++)
3603  totalGroups += aggstate->perhash[i].aggnode->numGroups;
3604 
3605  hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3606  &aggstate->hash_mem_limit,
3607  &aggstate->hash_ngroups_limit,
3608  &aggstate->hash_planned_partitions);
3609  find_hash_columns(aggstate);
3610 
3611  /* Skip massive memory allocation if we are just doing EXPLAIN */
3612  if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3613  build_hash_tables(aggstate);
3614 
3615  aggstate->table_filled = false;
3616 
3617  /* Initialize this to 1, meaning nothing spilled, yet */
3618  aggstate->hash_batches_used = 1;
3619  }
3620 
3621  /*
3622  * Initialize current phase-dependent values to initial phase. The initial
3623  * phase is 1 (first sort pass) for all strategies that use sorting (if
3624  * hashing is being done too, then phase 0 is processed last); but if only
3625  * hashing is being done, then phase 0 is all there is.
3626  */
3627  if (node->aggstrategy == AGG_HASHED)
3628  {
3629  aggstate->current_phase = 0;
3630  initialize_phase(aggstate, 0);
3631  select_current_set(aggstate, 0, true);
3632  }
3633  else
3634  {
3635  aggstate->current_phase = 1;
3636  initialize_phase(aggstate, 1);
3637  select_current_set(aggstate, 0, false);
3638  }
3639 
3640  /*
3641  * Perform lookups of aggregate function info, and initialize the
3642  * unchanging fields of the per-agg and per-trans data.
3643  */
3644  foreach(l, aggstate->aggs)
3645  {
3646  Aggref *aggref = lfirst(l);
3647  AggStatePerAgg peragg;
3648  AggStatePerTrans pertrans;
3649  Oid aggTransFnInputTypes[FUNC_MAX_ARGS];
3650  int numAggTransFnArgs;
3651  int numDirectArgs;
3652  HeapTuple aggTuple;
3653  Form_pg_aggregate aggform;
3654  AclResult aclresult;
3655  Oid finalfn_oid;
3656  Oid serialfn_oid,
3657  deserialfn_oid;
3658  Oid aggOwner;
3659  Expr *finalfnexpr;
3660  Oid aggtranstype;
3661 
3662  /* Planner should have assigned aggregate to correct level */
3663  Assert(aggref->agglevelsup == 0);
3664  /* ... and the split mode should match */
3665  Assert(aggref->aggsplit == aggstate->aggsplit);
3666 
3667  peragg = &peraggs[aggref->aggno];
3668 
3669  /* Check if we initialized the state for this aggregate already. */
3670  if (peragg->aggref != NULL)
3671  continue;
3672 
3673  peragg->aggref = aggref;
3674  peragg->transno = aggref->aggtransno;
3675 
3676  /* Fetch the pg_aggregate row */
3677  aggTuple = SearchSysCache1(AGGFNOID,
3678  ObjectIdGetDatum(aggref->aggfnoid));
3679  if (!HeapTupleIsValid(aggTuple))
3680  elog(ERROR, "cache lookup failed for aggregate %u",
3681  aggref->aggfnoid);
3682  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3683 
3684  /* Check permission to call aggregate function */
3685  aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3686  ACL_EXECUTE);
3687  if (aclresult != ACLCHECK_OK)
3688  aclcheck_error(aclresult, OBJECT_AGGREGATE,
3689  get_func_name(aggref->aggfnoid));
3691 
3692  /* planner recorded transition state type in the Aggref itself */
3693  aggtranstype = aggref->aggtranstype;
3694  Assert(OidIsValid(aggtranstype));
3695 
3696  /* Final function only required if we're finalizing the aggregates */
3697  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3698  peragg->finalfn_oid = finalfn_oid = InvalidOid;
3699  else
3700  peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3701 
3702  serialfn_oid = InvalidOid;
3703  deserialfn_oid = InvalidOid;
3704 
3705  /*
3706  * Check if serialization/deserialization is required. We only do it
3707  * for aggregates that have transtype INTERNAL.
3708  */
3709  if (aggtranstype == INTERNALOID)
3710  {
3711  /*
3712  * The planner should only have generated a serialize agg node if
3713  * every aggregate with an INTERNAL state has a serialization
3714  * function. Verify that.
3715  */
3716  if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3717  {
3718  /* serialization only valid when not running finalfn */
3720 
3721  if (!OidIsValid(aggform->aggserialfn))
3722  elog(ERROR, "serialfunc not provided for serialization aggregation");
3723  serialfn_oid = aggform->aggserialfn;
3724  }
3725 
3726  /* Likewise for deserialization functions */
3727  if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3728  {
3729  /* deserialization only valid when combining states */
3730  Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3731 
3732  if (!OidIsValid(aggform->aggdeserialfn))
3733  elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3734  deserialfn_oid = aggform->aggdeserialfn;
3735  }
3736  }
3737 
3738  /* Check that aggregate owner has permission to call component fns */
3739  {
3740  HeapTuple procTuple;
3741 
3742  procTuple = SearchSysCache1(PROCOID,
3743  ObjectIdGetDatum(aggref->aggfnoid));
3744  if (!HeapTupleIsValid(procTuple))
3745  elog(ERROR, "cache lookup failed for function %u",
3746  aggref->aggfnoid);
3747  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3748  ReleaseSysCache(procTuple);
3749 
3750  if (OidIsValid(finalfn_oid))
3751  {
3752  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3753  ACL_EXECUTE);
3754  if (aclresult != ACLCHECK_OK)
3755  aclcheck_error(aclresult, OBJECT_FUNCTION,
3756  get_func_name(finalfn_oid));
3757  InvokeFunctionExecuteHook(finalfn_oid);
3758  }
3759  if (OidIsValid(serialfn_oid))
3760  {
3761  aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3762  ACL_EXECUTE);
3763  if (aclresult != ACLCHECK_OK)
3764  aclcheck_error(aclresult, OBJECT_FUNCTION,
3765  get_func_name(serialfn_oid));
3766  InvokeFunctionExecuteHook(serialfn_oid);
3767  }
3768  if (OidIsValid(deserialfn_oid))
3769  {
3770  aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3771  ACL_EXECUTE);
3772  if (aclresult != ACLCHECK_OK)
3773  aclcheck_error(aclresult, OBJECT_FUNCTION,
3774  get_func_name(deserialfn_oid));
3775  InvokeFunctionExecuteHook(deserialfn_oid);
3776  }
3777  }
3778 
3779  /*
3780  * Get actual datatypes of the (nominal) aggregate inputs. These
3781  * could be different from the agg's declared input types, when the
3782  * agg accepts ANY or a polymorphic type.
3783  */
3784  numAggTransFnArgs = get_aggregate_argtypes(aggref,
3785  aggTransFnInputTypes);
3786 
3787  /* Count the "direct" arguments, if any */
3788  numDirectArgs = list_length(aggref->aggdirectargs);
3789 
3790  /* Detect how many arguments to pass to the finalfn */
3791  if (aggform->aggfinalextra)
3792  peragg->numFinalArgs = numAggTransFnArgs + 1;
3793  else
3794  peragg->numFinalArgs = numDirectArgs + 1;
3795 
3796  /* Initialize any direct-argument expressions */
3797  peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3798  (PlanState *) aggstate);
3799 
3800  /*
3801  * build expression trees using actual argument & result types for the
3802  * finalfn, if it exists and is required.
3803  */
3804  if (OidIsValid(finalfn_oid))
3805  {
3806  build_aggregate_finalfn_expr(aggTransFnInputTypes,
3807  peragg->numFinalArgs,
3808  aggtranstype,
3809  aggref->aggtype,
3810  aggref->inputcollid,
3811  finalfn_oid,
3812  &finalfnexpr);
3813  fmgr_info(finalfn_oid, &peragg->finalfn);
3814  fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3815  }
3816 
3817  /* get info about the output value's datatype */
3818  get_typlenbyval(aggref->aggtype,
3819  &peragg->resulttypeLen,
3820  &peragg->resulttypeByVal);
3821 
3822  /*
3823  * Build working state for invoking the transition function, if we
3824  * haven't done it already.
3825  */
3826  pertrans = &pertransstates[aggref->aggtransno];
3827  if (pertrans->aggref == NULL)
3828  {
3829  Datum textInitVal;
3830  Datum initValue;
3831  bool initValueIsNull;
3832  Oid transfn_oid;
3833 
3834  /*
3835  * If this aggregation is performing state combines, then instead
3836  * of using the transition function, we'll use the combine
3837  * function.
3838  */
3839  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3840  {
3841  transfn_oid = aggform->aggcombinefn;
3842 
3843  /* If not set then the planner messed up */
3844  if (!OidIsValid(transfn_oid))
3845  elog(ERROR, "combinefn not set for aggregate function");
3846  }
3847  else
3848  transfn_oid = aggform->aggtransfn;
3849 
3850  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, ACL_EXECUTE);
3851  if (aclresult != ACLCHECK_OK)
3852  aclcheck_error(aclresult, OBJECT_FUNCTION,
3853  get_func_name(transfn_oid));
3854  InvokeFunctionExecuteHook(transfn_oid);
3855 
3856  /*
3857  * initval is potentially null, so don't try to access it as a
3858  * struct field. Must do it the hard way with SysCacheGetAttr.
3859  */
3860  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3861  Anum_pg_aggregate_agginitval,
3862  &initValueIsNull);
3863  if (initValueIsNull)
3864  initValue = (Datum) 0;
3865  else
3866  initValue = GetAggInitVal(textInitVal, aggtranstype);
3867 
3868  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3869  {
3870  Oid combineFnInputTypes[] = {aggtranstype,
3871  aggtranstype};
3872 
3873  /*
3874  * When combining there's only one input, the to-be-combined
3875  * transition value. The transition value is not counted
3876  * here.
3877  */
3878  pertrans->numTransInputs = 1;
3879 
3880  /* aggcombinefn always has two arguments of aggtranstype */
3881  build_pertrans_for_aggref(pertrans, aggstate, estate,
3882  aggref, transfn_oid, aggtranstype,
3883  serialfn_oid, deserialfn_oid,
3884  initValue, initValueIsNull,
3885  combineFnInputTypes, 2);
3886 
3887  /*
3888  * Ensure that a combine function to combine INTERNAL states
3889  * is not strict. This should have been checked during CREATE
3890  * AGGREGATE, but the strict property could have been changed
3891  * since then.
3892  */
3893  if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
3894  ereport(ERROR,
3895  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3896  errmsg("combine function with transition type %s must not be declared STRICT",
3897  format_type_be(aggtranstype))));
3898  }
3899  else
3900  {
3901  /* Detect how many arguments to pass to the transfn */
3902  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3903  pertrans->numTransInputs = list_length(aggref->args);
3904  else
3905  pertrans->numTransInputs = numAggTransFnArgs;
3906 
3907  build_pertrans_for_aggref(pertrans, aggstate, estate,
3908  aggref, transfn_oid, aggtranstype,
3909  serialfn_oid, deserialfn_oid,
3910  initValue, initValueIsNull,
3911  aggTransFnInputTypes,
3912  numAggTransFnArgs);
3913 
3914  /*
3915  * If the transfn is strict and the initval is NULL, make sure
3916  * input type and transtype are the same (or at least
3917  * binary-compatible), so that it's OK to use the first
3918  * aggregated input value as the initial transValue. This
3919  * should have been checked at agg definition time, but we
3920  * must check again in case the transfn's strictness property
3921  * has been changed.
3922  */
3923  if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3924  {
3925  if (numAggTransFnArgs <= numDirectArgs ||
3926  !IsBinaryCoercible(aggTransFnInputTypes[numDirectArgs],
3927  aggtranstype))
3928  ereport(ERROR,
3929  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3930  errmsg("aggregate %u needs to have compatible input type and transition type",
3931  aggref->aggfnoid)));
3932  }
3933  }
3934  }
3935  else
3936  pertrans->aggshared = true;
3937  ReleaseSysCache(aggTuple);
3938  }
3939 
3940  /*
3941  * Update aggstate->numaggs to be the number of unique aggregates found.
3942  * Also set numstates to the number of unique transition states found.
3943  */
3944  aggstate->numaggs = numaggs;
3945  aggstate->numtrans = numtrans;
3946 
3947  /*
3948  * Last, check whether any more aggregates got added onto the node while
3949  * we processed the expressions for the aggregate arguments (including not
3950  * only the regular arguments and FILTER expressions handled immediately
3951  * above, but any direct arguments we might've handled earlier). If so,
3952  * we have nested aggregate functions, which is semantically nonsensical,
3953  * so complain. (This should have been caught by the parser, so we don't
3954  * need to work hard on a helpful error message; but we defend against it
3955  * here anyway, just to be sure.)
3956  */
3957  if (numaggrefs != list_length(aggstate->aggs))
3958  ereport(ERROR,
3959  (errcode(ERRCODE_GROUPING_ERROR),
3960  errmsg("aggregate function calls cannot be nested")));
3961 
3962  /*
3963  * Build expressions doing all the transition work at once. We build a
3964  * different one for each phase, as the number of transition function
3965  * invocation can differ between phases. Note this'll work both for
3966  * transition and combination functions (although there'll only be one
3967  * phase in the latter case).
3968  */
3969  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3970  {
3971  AggStatePerPhase phase = &aggstate->phases[phaseidx];
3972  bool dohash = false;
3973  bool dosort = false;
3974 
3975  /* phase 0 doesn't necessarily exist */
3976  if (!phase->aggnode)
3977  continue;
3978 
3979  if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3980  {
3981  /*
3982  * Phase one, and only phase one, in a mixed agg performs both
3983  * sorting and aggregation.
3984  */
3985  dohash = true;
3986  dosort = true;
3987  }
3988  else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
3989  {
3990  /*
3991  * No need to compute a transition function for an AGG_MIXED phase
3992  * 0 - the contents of the hashtables will have been computed
3993  * during phase 1.
3994  */
3995  continue;
3996  }
3997  else if (phase->aggstrategy == AGG_PLAIN ||
3998  phase->aggstrategy == AGG_SORTED)
3999  {
4000  dohash = false;
4001  dosort = true;
4002  }
4003  else if (phase->aggstrategy == AGG_HASHED)
4004  {
4005  dohash = true;
4006  dosort = false;
4007  }
4008  else
4009  Assert(false);
4010 
4011  phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4012  false);
4013 
4014  /* cache compiled expression for outer slot without NULL check */
4015  phase->evaltrans_cache[0][0] = phase->evaltrans;
4016  }
4017 
4018  return aggstate;
4019 }
4020 
4021 /*
4022  * Build the state needed to calculate a state value for an aggregate.
4023  *
4024  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4025  * to initialize the state for. 'transfn_oid', 'aggtranstype', and the rest
4026  * of the arguments could be calculated from 'aggref', but the caller has
4027  * calculated them already, so might as well pass them.
4028  *
4029  * 'transfn_oid' may be either the Oid of the aggtransfn or the aggcombinefn.
4030  */
4031 static void
4033  AggState *aggstate, EState *estate,
4034  Aggref *aggref,
4035  Oid transfn_oid, Oid aggtranstype,
4036  Oid aggserialfn, Oid aggdeserialfn,
4037  Datum initValue, bool initValueIsNull,
4038  Oid *inputTypes, int numArguments)
4039 {
4040  int numGroupingSets = Max(aggstate->maxsets, 1);
4041  Expr *transfnexpr;
4042  int numTransArgs;
4043  Expr *serialfnexpr = NULL;
4044  Expr *deserialfnexpr = NULL;
4045  ListCell *lc;
4046  int numInputs;
4047  int numDirectArgs;
4048  List *sortlist;
4049  int numSortCols;
4050  int numDistinctCols;
4051  int i;
4052 
4053  /* Begin filling in the pertrans data */
4054  pertrans->aggref = aggref;
4055  pertrans->aggshared = false;
4056  pertrans->aggCollation = aggref->inputcollid;
4057  pertrans->transfn_oid = transfn_oid;
4058  pertrans->serialfn_oid = aggserialfn;
4059  pertrans->deserialfn_oid = aggdeserialfn;
4060  pertrans->initValue = initValue;
4061  pertrans->initValueIsNull = initValueIsNull;
4062 
4063  /* Count the "direct" arguments, if any */
4064  numDirectArgs = list_length(aggref->aggdirectargs);
4065 
4066  /* Count the number of aggregated input columns */
4067  pertrans->numInputs = numInputs = list_length(aggref->args);
4068 
4069  pertrans->aggtranstype = aggtranstype;
4070 
4071  /* account for the current transition state */
4072  numTransArgs = pertrans->numTransInputs + 1;
4073 
4074  /*
4075  * Set up infrastructure for calling the transfn. Note that invtrans is
4076  * not needed here.
4077  */
4078  build_aggregate_transfn_expr(inputTypes,
4079  numArguments,
4080  numDirectArgs,
4081  aggref->aggvariadic,
4082  aggtranstype,
4083  aggref->inputcollid,
4084  transfn_oid,
4085  InvalidOid,
4086  &transfnexpr,
4087  NULL);
4088 
4089  fmgr_info(transfn_oid, &pertrans->transfn);
4090  fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4091 
4092  pertrans->transfn_fcinfo =
4095  &pertrans->transfn,
4096  numTransArgs,
4097  pertrans->aggCollation,
4098  (void *) aggstate, NULL);
4099 
4100  /* get info about the state value's datatype */
4101  get_typlenbyval(aggtranstype,
4102  &pertrans->transtypeLen,
4103  &pertrans->transtypeByVal);
4104 
4105  if (OidIsValid(aggserialfn))
4106  {
4107  build_aggregate_serialfn_expr(aggserialfn,
4108  &serialfnexpr);
4109  fmgr_info(aggserialfn, &pertrans->serialfn);
4110  fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4111 
4112  pertrans->serialfn_fcinfo =
4115  &pertrans->serialfn,
4116  1,
4117  InvalidOid,
4118  (void *) aggstate, NULL);
4119  }
4120 
4121  if (OidIsValid(aggdeserialfn))
4122  {
4123  build_aggregate_deserialfn_expr(aggdeserialfn,
4124  &deserialfnexpr);
4125  fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4126  fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4127 
4128  pertrans->deserialfn_fcinfo =
4131  &pertrans->deserialfn,
4132  2,
4133  InvalidOid,
4134  (void *) aggstate, NULL);
4135  }
4136 
4137  /*
4138  * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4139  * have a list of SortGroupClause nodes; fish out the data in them and
4140  * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4141  * however; the agg's transfn and finalfn are responsible for that.
4142  *
4143  * When the planner has set the aggpresorted flag, the input to the
4144  * aggregate is already correctly sorted. For ORDER BY aggregates we can
4145  * simply treat these as normal aggregates. For presorted DISTINCT
4146  * aggregates an extra step must be added to remove duplicate consecutive
4147  * inputs.
4148  *
4149  * Note that by construction, if there is a DISTINCT clause then the ORDER
4150  * BY clause is a prefix of it (see transformDistinctClause).
4151  */
4152  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4153  {
4154  sortlist = NIL;
4155  numSortCols = numDistinctCols = 0;
4156  pertrans->aggsortrequired = false;
4157  }
4158  else if (aggref->aggpresorted && aggref->aggdistinct == NIL)
4159  {
4160  sortlist = NIL;
4161  numSortCols = numDistinctCols = 0;
4162  pertrans->aggsortrequired = false;
4163  }
4164  else if (aggref->aggdistinct)
4165  {
4166  sortlist = aggref->aggdistinct;
4167  numSortCols = numDistinctCols = list_length(sortlist);
4168  Assert(numSortCols >= list_length(aggref->aggorder));
4169  pertrans->aggsortrequired = !aggref->aggpresorted;
4170  }
4171  else
4172  {
4173  sortlist = aggref->aggorder;
4174  numSortCols = list_length(sortlist);
4175  numDistinctCols = 0;
4176  pertrans->aggsortrequired = (numSortCols > 0);
4177  }
4178 
4179  pertrans->numSortCols = numSortCols;
4180  pertrans->numDistinctCols = numDistinctCols;
4181 
4182  /*
4183  * If we have either sorting or filtering to do, create a tupledesc and
4184  * slot corresponding to the aggregated inputs (including sort
4185  * expressions) of the agg.
4186  */
4187  if (numSortCols > 0 || aggref->aggfilter)
4188  {
4189  pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4190  pertrans->sortslot =
4191  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4193  }
4194 
4195  if (numSortCols > 0)
4196  {
4197  /*
4198  * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4199  * (yet)
4200  */
4201  Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4202 
4203  /* ORDER BY aggregates are not supported with partial aggregation */
4204  Assert(!DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
4205 
4206  /* If we have only one input, we need its len/byval info. */
4207  if (numInputs == 1)
4208  {
4209  get_typlenbyval(inputTypes[numDirectArgs],
4210  &pertrans->inputtypeLen,
4211  &pertrans->inputtypeByVal);
4212  }
4213  else if (numDistinctCols > 0)
4214  {
4215  /* we will need an extra slot to store prior values */
4216  pertrans->uniqslot =
4217  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4219  }
4220 
4221  /* Extract the sort information for use later */
4222  pertrans->sortColIdx =
4223  (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4224  pertrans->sortOperators =
4225  (Oid *) palloc(numSortCols * sizeof(Oid));
4226  pertrans->sortCollations =
4227  (Oid *) palloc(numSortCols * sizeof(Oid));
4228  pertrans->sortNullsFirst =
4229  (bool *) palloc(numSortCols * sizeof(bool));
4230 
4231  i = 0;
4232  foreach(lc, sortlist)
4233  {
4234  SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4235  TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4236 
4237  /* the parser should have made sure of this */
4238  Assert(OidIsValid(sortcl->sortop));
4239 
4240  pertrans->sortColIdx[i] = tle->resno;
4241  pertrans->sortOperators[i] = sortcl->sortop;
4242  pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4243  pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4244  i++;
4245  }
4246  Assert(i == numSortCols);
4247  }
4248 
4249  if (aggref->aggdistinct)
4250  {
4251  Oid *ops;
4252 
4253  Assert(numArguments > 0);
4254  Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4255 
4256  ops = palloc(numDistinctCols * sizeof(Oid));
4257 
4258  i = 0;
4259  foreach(lc, aggref->aggdistinct)
4260  ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4261 
4262  /* lookup / build the necessary comparators */
4263  if (numDistinctCols == 1)
4264  fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4265  else
4266  pertrans->equalfnMulti =
4267  execTuplesMatchPrepare(pertrans->sortdesc,
4268  numDistinctCols,
4269  pertrans->sortColIdx,
4270  ops,
4271  pertrans->sortCollations,
4272  &aggstate->ss.ps);
4273  pfree(ops);
4274  }
4275 
4276  pertrans->sortstates = (Tuplesortstate **)
4277  palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4278 }
4279 
4280 
4281 static Datum
4282 GetAggInitVal(Datum textInitVal, Oid transtype)
4283 {
4284  Oid typinput,
4285  typioparam;
4286  char *strInitVal;
4287  Datum initVal;
4288 
4289  getTypeInputInfo(transtype, &typinput, &typioparam);
4290  strInitVal = TextDatumGetCString(textInitVal);
4291  initVal = OidInputFunctionCall(typinput, strInitVal,
4292  typioparam, -1);
4293  pfree(strInitVal);
4294  return initVal;
4295 }
4296 
4297 void
4299 {
4301  int transno;
4302  int numGroupingSets = Max(node->maxsets, 1);
4303  int setno;
4304 
4305  /*
4306  * When ending a parallel worker, copy the statistics gathered by the
4307  * worker back into shared memory so that it can be picked up by the main
4308  * process to report in EXPLAIN ANALYZE.
4309  */
4310  if (node->shared_info && IsParallelWorker())
4311  {
4313 
4314  Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4317  si->hash_disk_used = node->hash_disk_used;
4318  si->hash_mem_peak = node->hash_mem_peak;
4319  }
4320 
4321  /* Make sure we have closed any open tuplesorts */
4322 
4323  if (node->sort_in)
4324  tuplesort_end(node->sort_in);
4325  if (node->sort_out)
4326  tuplesort_end(node->sort_out);
4327 
4329 
4330  if (node->hash_metacxt != NULL)
4331  {
4333  node->hash_metacxt = NULL;
4334  }
4335 
4336  for (transno = 0; transno < node->numtrans; transno++)
4337  {
4338  AggStatePerTrans pertrans = &node->pertrans[transno];
4339 
4340  for (setno = 0; setno < numGroupingSets; setno++)
4341  {
4342  if (pertrans->sortstates[setno])
4343  tuplesort_end(pertrans->sortstates[setno]);
4344  }
4345  }
4346 
4347  /* And ensure any agg shutdown callbacks have been called */
4348  for (setno = 0; setno < numGroupingSets; setno++)
4349  ReScanExprContext(node->aggcontexts[setno]);
4350  if (node->hashcontext)
4352 
4353  /*
4354  * We don't actually free any ExprContexts here (see comment in
4355  * ExecFreeExprContext), just unlinking the output one from the plan node
4356  * suffices.
4357  */
4358  ExecFreeExprContext(&node->ss.ps);
4359 
4360  /* clean up tuple table */
4362 
4363  outerPlan = outerPlanState(node);
4365 }
4366 
4367 void
4369 {
4370  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4372  Agg *aggnode = (Agg *) node->ss.ps.plan;
4373  int transno;
4374  int numGroupingSets = Max(node->maxsets, 1);
4375  int setno;
4376 
4377  node->agg_done = false;
4378 
4379  if (node->aggstrategy == AGG_HASHED)
4380  {
4381  /*
4382  * In the hashed case, if we haven't yet built the hash table then we
4383  * can just return; nothing done yet, so nothing to undo. If subnode's
4384  * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4385  * else no reason to re-scan it at all.
4386  */
4387  if (!node->table_filled)
4388  return;
4389 
4390  /*
4391  * If we do have the hash table, and it never spilled, and the subplan
4392  * does not have any parameter changes, and none of our own parameter
4393  * changes affect input expressions of the aggregated functions, then
4394  * we can just rescan the existing hash table; no need to build it
4395  * again.
4396  */
4397  if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4398  !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4399  {
4401  &node->perhash[0].hashiter);
4402  select_current_set(node, 0, true);
4403  return;
4404  }
4405  }
4406 
4407  /* Make sure we have closed any open tuplesorts */
4408  for (transno = 0; transno < node->numtrans; transno++)
4409  {
4410  for (setno = 0; setno < numGroupingSets; setno++)
4411  {
4412  AggStatePerTrans pertrans = &node->pertrans[transno];
4413 
4414  if (pertrans->sortstates[setno])
4415  {
4416  tuplesort_end(pertrans->sortstates[setno]);
4417  pertrans->sortstates[setno] = NULL;
4418  }
4419  }
4420  }
4421 
4422  /*
4423  * We don't need to ReScanExprContext the output tuple context here;
4424  * ExecReScan already did it. But we do need to reset our per-grouping-set
4425  * contexts, which may have transvalues stored in them. (We use rescan
4426  * rather than just reset because transfns may have registered callbacks
4427  * that need to be run now.) For the AGG_HASHED case, see below.
4428  */
4429 
4430  for (setno = 0; setno < numGroupingSets; setno++)
4431  {
4432  ReScanExprContext(node->aggcontexts[setno]);
4433  }
4434 
4435  /* Release first tuple of group, if we have made a copy */
4436  if (node->grp_firstTuple != NULL)
4437  {
4439  node->grp_firstTuple = NULL;
4440  }
4442 
4443  /* Forget current agg values */
4444  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4445  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4446 
4447  /*
4448  * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4449  * the hashcontext. This used to be an issue, but now, resetting a context
4450  * automatically deletes sub-contexts too.
4451  */
4452  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4453  {
4455 
4456  node->hash_ever_spilled = false;
4457  node->hash_spill_mode = false;
4458  node->hash_ngroups_current = 0;
4459 
4461  /* Rebuild an empty hash table */
4462  build_hash_tables(node);
4463  node->table_filled = false;
4464  /* iterator will be reset when the table is filled */
4465 
4466  hashagg_recompile_expressions(node, false, false);
4467  }
4468 
4469  if (node->aggstrategy != AGG_HASHED)
4470  {
4471  /*
4472  * Reset the per-group state (in particular, mark transvalues null)
4473  */
4474  for (setno = 0; setno < numGroupingSets; setno++)
4475  {
4476  MemSet(node->pergroups[setno], 0,
4477  sizeof(AggStatePerGroupData) * node->numaggs);
4478  }
4479 
4480  /* reset to phase 1 */
4481  initialize_phase(node, 1);
4482 
4483  node->input_done = false;
4484  node->projected_set = -1;
4485  }
4486 
4487  if (outerPlan->chgParam == NULL)
4489 }
4490 
4491 
4492 /***********************************************************************
4493  * API exposed to aggregate functions
4494  ***********************************************************************/
4495 
4496 
4497 /*
4498  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4499  *
4500  * The transition and/or final functions of an aggregate may want to verify
4501  * that they are being called as aggregates, rather than as plain SQL
4502  * functions. They should use this function to do so. The return value
4503  * is nonzero if being called as an aggregate, or zero if not. (Specific
4504  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4505  * values could conceivably appear in future.)
4506  *
4507  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4508  * identity of the memory context that aggregate transition values are being
4509  * stored in. Note that the same aggregate call site (flinfo) may be called
4510  * interleaved on different transition values in different contexts, so it's
4511  * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4512  * cache it in the transvalue itself (for internal-type transvalues).
4513  */
4514 int
4516 {
4517  if (fcinfo->context && IsA(fcinfo->context, AggState))
4518  {
4519  if (aggcontext)
4520  {
4521  AggState *aggstate = ((AggState *) fcinfo->context);
4522  ExprContext *cxt = aggstate->curaggcontext;
4523 
4524  *aggcontext = cxt->ecxt_per_tuple_memory;
4525  }
4526  return AGG_CONTEXT_AGGREGATE;
4527  }
4528  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4529  {
4530  if (aggcontext)
4531  *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4532  return AGG_CONTEXT_WINDOW;
4533  }
4534 
4535  /* this is just to prevent "uninitialized variable" warnings */
4536  if (aggcontext)
4537  *aggcontext = NULL;
4538  return 0;
4539 }
4540 
4541 /*
4542  * AggGetAggref - allow an aggregate support function to get its Aggref
4543  *
4544  * If the function is being called as an aggregate support function,
4545  * return the Aggref node for the aggregate call. Otherwise, return NULL.
4546  *
4547  * Aggregates sharing the same inputs and transition functions can get
4548  * merged into a single transition calculation. If the transition function
4549  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4550  * executing. It must therefore not pay attention to the Aggref fields that
4551  * relate to the final function, as those are indeterminate. But if a final
4552  * function calls AggGetAggref, it will get a precise result.
4553  *
4554  * Note that if an aggregate is being used as a window function, this will
4555  * return NULL. We could provide a similar function to return the relevant
4556  * WindowFunc node in such cases, but it's not needed yet.
4557  */
4558 Aggref *
4560 {
4561  if (fcinfo->context && IsA(fcinfo->context, AggState))
4562  {
4563  AggState *aggstate = (AggState *) fcinfo->context;
4564  AggStatePerAgg curperagg;
4565  AggStatePerTrans curpertrans;
4566 
4567  /* check curperagg (valid when in a final function) */
4568  curperagg = aggstate->curperagg;
4569 
4570  if (curperagg)
4571  return curperagg->aggref;
4572 
4573  /* check curpertrans (valid when in a transition function) */
4574  curpertrans = aggstate->curpertrans;
4575 
4576  if (curpertrans)
4577  return curpertrans->aggref;
4578  }
4579  return NULL;
4580 }
4581 
4582 /*
4583  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4584  *
4585  * This is useful in agg final functions; the context returned is one that
4586  * the final function can safely reset as desired. This isn't useful for
4587  * transition functions, since the context returned MAY (we don't promise)
4588  * be the same as the context those are called in.
4589  *
4590  * As above, this is currently not useful for aggs called as window functions.
4591  */
4594 {
4595  if (fcinfo->context && IsA(fcinfo->context, AggState))
4596  {
4597  AggState *aggstate = (AggState *) fcinfo->context;
4598 
4599  return aggstate->tmpcontext->ecxt_per_tuple_memory;
4600  }
4601  return NULL;
4602 }
4603 
4604 /*
4605  * AggStateIsShared - find out whether transition state is shared
4606  *
4607  * If the function is being called as an aggregate support function,
4608  * return true if the aggregate's transition state is shared across
4609  * multiple aggregates, false if it is not.
4610  *
4611  * Returns true if not called as an aggregate support function.
4612  * This is intended as a conservative answer, ie "no you'd better not
4613  * scribble on your input". In particular, will return true if the
4614  * aggregate is being used as a window function, which is a scenario
4615  * in which changing the transition state is a bad idea. We might
4616  * want to refine the behavior for the window case in future.
4617  */
4618 bool
4620 {
4621  if (fcinfo->context && IsA(fcinfo->context, AggState))
4622  {
4623  AggState *aggstate = (AggState *) fcinfo->context;
4624  AggStatePerAgg curperagg;
4625  AggStatePerTrans curpertrans;
4626 
4627  /* check curperagg (valid when in a final function) */
4628  curperagg = aggstate->curperagg;
4629 
4630  if (curperagg)
4631  return aggstate->pertrans[curperagg->transno].aggshared;
4632 
4633  /* check curpertrans (valid when in a transition function) */
4634  curpertrans = aggstate->curpertrans;
4635 
4636  if (curpertrans)
4637  return curpertrans->aggshared;
4638  }
4639  return true;
4640 }
4641 
4642 /*
4643  * AggRegisterCallback - register a cleanup callback for an aggregate
4644  *
4645  * This is useful for aggs to register shutdown callbacks, which will ensure
4646  * that non-memory resources are freed. The callback will occur just before
4647  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4648  * either between groups or as a result of rescanning the query. The callback
4649  * will NOT be called on error paths. The typical use-case is for freeing of
4650  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4651  * created by the agg functions. (The callback will not be called until after
4652  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4653  * to return data that will be freed by the callback.)
4654  *
4655  * As above, this is currently not useful for aggs called as window functions.
4656  */
4657 void
4660  Datum arg)
4661 {
4662  if (fcinfo->context && IsA(fcinfo->context, AggState))
4663  {
4664  AggState *aggstate = (AggState *) fcinfo->context;
4665  ExprContext *cxt = aggstate->curaggcontext;
4666 
4667  RegisterExprContextCallback(cxt, func, arg);
4668 
4669  return;
4670  }
4671  elog(ERROR, "aggregate function cannot register a callback in this context");
4672 }
4673 
4674 
4675 /* ----------------------------------------------------------------
4676  * Parallel Query Support
4677  * ----------------------------------------------------------------
4678  */
4679 
4680  /* ----------------------------------------------------------------
4681  * ExecAggEstimate
4682  *
4683  * Estimate space required to propagate aggregate statistics.
4684  * ----------------------------------------------------------------
4685  */
4686 void
4688 {
4689  Size size;
4690 
4691  /* don't need this if not instrumenting or no workers */
4692  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4693  return;
4694 
4695  size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4696  size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4697  shm_toc_estimate_chunk(&pcxt->estimator, size);
4698  shm_toc_estimate_keys(&pcxt->estimator, 1);
4699 }
4700 
4701 /* ----------------------------------------------------------------
4702  * ExecAggInitializeDSM
4703  *
4704  * Initialize DSM space for aggregate statistics.
4705  * ----------------------------------------------------------------
4706  */
4707 void
4709 {
4710  Size size;
4711 
4712  /* don't need this if not instrumenting or no workers */
4713  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4714  return;
4715 
4716  size = offsetof(SharedAggInfo, sinstrument)
4717  + pcxt->nworkers * sizeof(AggregateInstrumentation);
4718  node->shared_info = shm_toc_allocate(pcxt->toc, size);
4719  /* ensure any unfilled slots will contain zeroes */
4720  memset(node->shared_info, 0, size);
4721  node->shared_info->num_workers = pcxt->nworkers;
4722  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4723  node->shared_info);
4724 }
4725 
4726 /* ----------------------------------------------------------------
4727  * ExecAggInitializeWorker
4728  *
4729  * Attach worker to DSM space for aggregate statistics.
4730  * ----------------------------------------------------------------
4731  */
4732 void
4734 {
4735  node->shared_info =
4736  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
4737 }
4738 
4739 /* ----------------------------------------------------------------
4740  * ExecAggRetrieveInstrumentation
4741  *
4742  * Transfer aggregate statistics from DSM to private memory.
4743  * ----------------------------------------------------------------
4744  */
4745 void
4747 {
4748  Size size;
4749  SharedAggInfo *si;
4750 
4751  if (node->shared_info == NULL)
4752  return;
4753 
4754  size = offsetof(SharedAggInfo, sinstrument)
4756  si = palloc(size);
4757  memcpy(si, node->shared_info, size);
4758  node->shared_info = si;
4759 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3458
AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:5017
int16 AttrNumber
Definition: attnum.h:21
int ParallelWorkerNumber
Definition: parallel.c:113
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1045
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:648
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:225
Bitmapset * bms_add_members(Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:795
Bitmapset * bms_del_member(Bitmapset *a, int x)
Definition: bitmapset.c:775
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:494
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
int bms_first_member(Bitmapset *a)
Definition: bitmapset.c:998
#define TextDatumGetCString(d)
Definition: builtins.h:86
unsigned int uint32
Definition: c.h:442
#define MAXALIGN(LEN)
Definition: c.h:747
#define Max(x, y)
Definition: c.h:970
#define MemSet(start, val, len)
Definition: c.h:998
#define OidIsValid(objectId)
Definition: c.h:711
size_t Size
Definition: c.h:541
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:132
int my_log2(long num)
Definition: dynahash.c:1765
int errcode_for_file_access(void)
Definition: elog.c:718
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
ExprState * ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, bool doSort, bool doHash, bool nullcheck)
Definition: execExpr.c:3273
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
Definition: execGrouping.c:96
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
Definition: execGrouping.c:361
TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 *hash)
Definition: execGrouping.c:306
TupleHashTable BuildTupleHashTableExt(PlanState *parent, TupleDesc inputDesc, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, long nbuckets, Size additionalsize, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, bool use_variable_hash_iv)
Definition: execGrouping.c:154
void ResetTupleHashTable(TupleHashTable hashtable)
Definition: execGrouping.c:285
ExprState * execTuplesMatchPrepare(TupleDesc desc, int numCols, const AttrNumber *keyColIdx, const Oid *eqOperators, const Oid *collations, PlanState *parent)
Definition: execGrouping.c:59
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:557
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1692
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:1576
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1446
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleDesc ExecTypeFromTL(List *targetList)
Definition: execTuples.c:1938
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1171
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1469
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:490
void ReScanExprContext(ExprContext *econtext)
Definition: execUtils.c:438
ExprContext * CreateWorkExprContext(EState *estate)
Definition: execUtils.c:316
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:499
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
Definition: execUtils.c:682
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:480
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:535
void RegisterExprContextCallback(ExprContext *econtext, ExprContextCallbackFunction function, Datum arg)
Definition: execUtils.c:925
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:650
void(* ExprContextCallbackFunction)(Datum arg)
Definition: execnodes.h:209
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1119
#define outerPlanState(node)
Definition: execnodes.h:1111
#define ScanTupleHashTable(htable, iter)
Definition: execnodes.h:818
#define ResetTupleHashIterator(htable, iter)
Definition: execnodes.h:816
struct AggStatePerGroupData * AggStatePerGroup
Definition: execnodes.h:2337
struct AggStatePerTransData * AggStatePerTrans
Definition: execnodes.h:2336
struct TupleHashEntryData TupleHashEntryData
struct AggregateInstrumentation AggregateInstrumentation
struct AggStatePerAggData * AggStatePerAgg
Definition: execnodes.h:2335
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define EXEC_FLAG_REWIND
Definition: executor.h:57
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:361
#define ResetExprContext(econtext)
Definition: executor.h:529
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:398
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:425
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:318
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:333
#define EXEC_FLAG_EXPLAIN_ONLY
Definition: executor.h:56
#define EXEC_FLAG_MARK
Definition: executor.h:59
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:254
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1134
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1630
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
#define AGG_CONTEXT_WINDOW
Definition: fmgr.h:754
#define LOCAL_FCINFO(name, nargs)
Definition: fmgr.h:110
#define AGG_CONTEXT_AGGREGATE
Definition: fmgr.h:753
struct FunctionCallInfoBaseData * FunctionCallInfo
Definition: fmgr.h:38
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:135
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
int work_mem
Definition: globals.c:125
uint32 hash_bytes_uint32(uint32 k)
Definition: hashfn.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
MinimalTupleData * MinimalTuple
Definition: htup.h:27
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SizeofMinimalTupleHeader
Definition: htup_details.h:643
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void initHyperLogLog(hyperLogLogState *cState, uint8 bwidth)
Definition: hyperloglog.c:66
double estimateHyperLogLog(hyperLogLogState *cState)
Definition: hyperloglog.c:186
void addHyperLogLog(hyperLogLogState *cState, uint32 hash)
Definition: hyperloglog.c:167
void freeHyperLogLog(hyperLogLogState *cState)
Definition: hyperloglog.c:151
#define IsParallelWorker()
Definition: parallel.h:61
static int initValue(long lng_val)
Definition: informix.c:677
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lcons_int(int datum, List *list)
Definition: list.c:512
List * lappend(List *list, void *datum)
Definition: list.c:338
void list_free(List *list)
Definition: list.c:1545
void list_free_deep(List *list)
Definition: list.c:1559
List * list_delete_last(List *list)
Definition: list.c:956
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:681
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:847
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:929
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:734
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:668
void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:762
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:557
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1184
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2209
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1267
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2832
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1590
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:195
void pfree(void *pointer)
Definition: mcxt.c:1252
void * palloc0(Size size)
Definition: mcxt.c:1176
MemoryContext CurrentMemoryContext
Definition: mcxt.c:87
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:940
Size MemoryContextMemAllocated(MemoryContext context, bool recurse)
Definition: mcxt.c:547
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:827
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:270
void * palloc(Size size)
Definition: mcxt.c:1145
#define AllocSetContextCreate
Definition: memutils.h:130
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:154
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
Oid GetUserId(void)
Definition: miscinit.c:497
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition: nodeAgg.c:3054
static long hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
Definition: nodeAgg.c:1961
static void hash_agg_check_limits(AggState *aggstate)
Definition: nodeAgg.c:1851
static void initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable, TupleHashEntry entry)
Definition: nodeAgg.c:2040
static void find_hash_columns(AggState *aggstate)
Definition: nodeAgg.c:1559
static bool agg_refill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2589
static void build_hash_table(AggState *aggstate, int setno, long nbuckets)
Definition: nodeAgg.c:1499
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4687
struct FindColsContext FindColsContext
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition: nodeAgg.c:1877
struct HashAggBatch HashAggBatch
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition: nodeAgg.c:4282
static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
Definition: nodeAgg.c:1393
void AggRegisterCallback(FunctionCallInfo fcinfo, ExprContextCallbackFunction func, Datum arg)
Definition: nodeAgg.c:4658
#define HASHAGG_HLL_BIT_WIDTH
Definition: nodeAgg.c:315
static void agg_fill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2535
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:579
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition: nodeAgg.c:548
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition: nodeAgg.c:3088
static bool find_cols_walker(Node *node, FindColsContext *context)
Definition: nodeAgg.c:1416
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
Definition: nodeAgg.c:4733
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition: nodeAgg.c:3168
void ExecAggRetrieveInstrumentation(AggState *node)
Definition: nodeAgg.c:4746
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition: nodeAgg.c:2153
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition: nodeAgg.c:1367
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
Definition: nodeAgg.c:3005
struct HashAggSpill HashAggSpill
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:943
void ExecReScanAgg(AggState *node)
Definition: nodeAgg.c:4368
int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
Definition: nodeAgg.c:4515
static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:707
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition: nodeAgg.c:1912
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peraggs, AggStatePerGroup pergroup)
Definition: nodeAgg.c:1290
static void initialize_phase(AggState *aggstate, int newphase)
Definition: nodeAgg.c:478
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
Definition: nodeAgg.c:1689
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition: nodeAgg.c:666
static TupleTableSlot * agg_retrieve_hash_table_in_memory(AggState *aggstate)
Definition: nodeAgg.c:2766
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
Definition: nodeAgg.c:4708
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1037
#define HASHAGG_MAX_PARTITIONS
Definition: nodeAgg.c:298
static void lookup_hash_entries(AggState *aggstate)
Definition: nodeAgg.c:2090
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition: nodeAgg.c:2189
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
Definition: nodeAgg.c:1736
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition: nodeAgg.c:1245
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4619
static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, Aggref *aggref, Oid transfn_oid, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments)
Definition: nodeAgg.c:4032
Aggref * AggGetAggref(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4559
#define CHUNKHDRSZ
Definition: nodeAgg.c:321
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2741
static void process_ordered_aggregate_single(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:851
static void advance_aggregates(AggState *aggstate)
Definition: nodeAgg.c:819
static void prepare_hash_slot(AggStatePerHash perhash, TupleTableSlot *inputslot, TupleTableSlot *hashslot)
Definition: nodeAgg.c:1200
static void build_hash_tables(AggState *aggstate)
Definition: nodeAgg.c:1464
void ExecEndAgg(AggState *node)
Definition: nodeAgg.c:4298
#define HASHAGG_READ_BUFFER_SIZE
Definition: nodeAgg.c:306
static void hashagg_reset_spill_state(AggState *aggstate)
Definition: nodeAgg.c:3128
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *inputslot, uint32 hash)
Definition: nodeAgg.c:2920
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition: nodeAgg.c:456
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1140
static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, double input_groups, double hashentrysize)
Definition: nodeAgg.c:2889
#define HASHAGG_MIN_PARTITIONS
Definition: nodeAgg.c:297
void hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions)
Definition: nodeAgg.c:1793
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4593
#define HASHAGG_PARTITION_FACTOR
Definition: nodeAgg.c:296
static HashAggBatch * hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits)
Definition: nodeAgg.c:2986
#define HASHAGG_WRITE_BUFFER_SIZE
Definition: nodeAgg.c:307
static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartitions)
Definition: nodeAgg.c:1986
struct AggStatePerGroupData AggStatePerGroupData
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:769
#define expression_tree_walker(n, w, c)
Definition: nodeFuncs.h:151
size_t get_hash_memory_limit(void)
Definition: nodeHash.c:3394
#define DO_AGGSPLIT_SKIPFINAL(as)
Definition: nodes.h:377
#define IsA(nodeptr, _type_)
Definition: nodes.h:162
#define DO_AGGSPLIT_DESERIALIZE(as)
Definition: nodes.h:379
#define DO_AGGSPLIT_COMBINE(as)
Definition: nodes.h:376
@ AGG_SORTED
Definition: nodes.h:346
@ AGG_HASHED
Definition: nodes.h:347
@ AGG_MIXED
Definition: nodes.h:348
@ AGG_PLAIN
Definition: nodes.h:345
#define DO_AGGSPLIT_SERIALIZE(as)
Definition: nodes.h:378
#define makeNode(_type_)
Definition: nodes.h:159
#define castNode(_type_, nodeptr)
Definition: nodes.h:180
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:213
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:131
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2084
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition: parse_agg.c:2060
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1976
int get_aggregate_argtypes(Aggref *aggref, Oid *inputTypes)
Definition: parse_agg.c:1890
void build_aggregate_serialfn_expr(Oid serialfn_oid, Expr **serialfnexpr)
Definition: parse_agg.c:2037
bool IsBinaryCoercible(Oid srctype, Oid targettype)
@ OBJECT_AGGREGATE
Definition: parsenodes.h:1860
@ OBJECT_FUNCTION
Definition: parsenodes.h:1878
#define ACL_EXECUTE
Definition: parsenodes.h:90
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:109
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void * arg
#define FUNC_MAX_ARGS
#define lfirst(lc)
Definition: pg_list.h:170
#define llast(l)
Definition: pg_list.h:196
static int list_length(const List *l)
Definition: pg_list.h:150
#define NIL
Definition: pg_list.h:66
#define lfirst_int(lc)
Definition: pg_list.h:171
#define linitial_int(l)
Definition: pg_list.h:177
static void * list_nth(const List *list, int n)
Definition: pg_list.h:297
#define list_nth_node(type, list, n)
Definition: pg_list.h:325
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:136
#define outerPlan(node)
Definition: plannodes.h:180
static bool DatumGetBool(Datum X)
Definition: postgres.h:438
uintptr_t Datum
Definition: postgres.h:412
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:660
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define OUTER_VAR
Definition: primnodes.h:194
static unsigned hash(unsigned *uv, int n)
Definition: rege_dfa.c:715
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
FmgrInfo finalfn
Definition: nodeAgg.h:207
bool resulttypeByVal
Definition: nodeAgg.h:225
List * aggdirectargs
Definition: nodeAgg.h:218
Aggref * aggref
Definition: nodeAgg.h:195
int16 resulttypeLen
Definition: nodeAgg.h:224
FmgrInfo * hashfunctions
Definition: nodeAgg.h:314
TupleHashTable hashtable
Definition: nodeAgg.h:311
TupleTableSlot * hashslot
Definition: nodeAgg.h:313
TupleHashIterator hashiter
Definition: nodeAgg.h:312
AttrNumber * hashGrpColIdxHash
Definition: nodeAgg.h:320
AttrNumber * hashGrpColIdxInput
Definition: nodeAgg.h:319
Bitmapset ** grouped_cols
Definition: nodeAgg.h:285
ExprState * evaltrans
Definition: nodeAgg.h:291
ExprState * evaltrans_cache[2][2]
Definition: nodeAgg.h:299
ExprState ** eqfunctions
Definition: nodeAgg.h:286
AggStrategy aggstrategy
Definition: nodeAgg.h:282
bool * sortNullsFirst
Definition: nodeAgg.h:108
FmgrInfo serialfn
Definition: nodeAgg.h:89
FmgrInfo equalfnOne
Definition: nodeAgg.h:115
TupleDesc sortdesc
Definition: nodeAgg.h:143
TupleTableSlot * sortslot
Definition: nodeAgg.h:141
FmgrInfo transfn
Definition: nodeAgg.h:86
Aggref * aggref
Definition: nodeAgg.h:44
ExprState * equalfnMulti
Definition: nodeAgg.h:116
Tuplesortstate ** sortstates
Definition: nodeAgg.h:162
TupleTableSlot * uniqslot
Definition: nodeAgg.h:142
FmgrInfo deserialfn
Definition: nodeAgg.h:92
FunctionCallInfo deserialfn_fcinfo
Definition: nodeAgg.h:175
AttrNumber * sortColIdx
Definition: nodeAgg.h:105
FunctionCallInfo serialfn_fcinfo
Definition: nodeAgg.h:173
FunctionCallInfo transfn_fcinfo
Definition: nodeAgg.h:170
MemoryContext hash_metacxt
Definition: execnodes.h:2385
ScanState ss
Definition: execnodes.h:2343
Tuplesortstate * sort_out
Definition: execnodes.h:2376
uint64 hash_disk_used
Definition: execnodes.h:2403
AggStatePerGroup * all_pergroups
Definition: execnodes.h:2412
AggStatePerGroup * hash_pergroup
Definition: execnodes.h:2407
AggStatePerPhase phase
Definition: execnodes.h:2349
List * aggs
Definition: execnodes.h:2344
ExprContext * tmpcontext
Definition: execnodes.h:2356
int max_colno_needed
Definition: execnodes.h:2370
int hash_planned_partitions
Definition: execnodes.h:2397
HeapTuple grp_firstTuple
Definition: execnodes.h:2381
Size hash_mem_limit
Definition: execnodes.h:2395
ExprContext * curaggcontext
Definition: execnodes.h:2358
AggStatePerTrans curpertrans
Definition: execnodes.h:2361
bool table_filled
Definition: execnodes.h:2383
AggStatePerTrans pertrans
Definition: execnodes.h:2353
int current_set
Definition: execnodes.h:2366
struct LogicalTapeSet * hash_tapeset
Definition: execnodes.h:2386
AggStrategy aggstrategy
Definition: execnodes.h:2347
int numtrans
Definition: execnodes.h:2346
ExprContext * hashcontext
Definition: execnodes.h:2354
AggSplit aggsplit
Definition: execnodes.h:2348
int projected_set
Definition: execnodes.h:2364
SharedAggInfo * shared_info
Definition: execnodes.h:2415
uint64 hash_ngroups_limit
Definition: execnodes.h:2396
bool input_done
Definition: execnodes.h:2362
AggStatePerPhase phases
Definition: execnodes.h:2374
List * all_grouped_cols
Definition: execnodes.h:2368
bool hash_spill_mode
Definition: execnodes.h:2393
AggStatePerGroup * pergroups
Definition: execnodes.h:2379
AggStatePerHash perhash
Definition: execnodes.h:2406
Size hash_mem_peak
Definition: execnodes.h:2400
double hashentrysize
Definition: execnodes.h:2399
int numphases
Definition: execnodes.h:2350
uint64 hash_ngroups_current
Definition: execnodes.h:2401
int hash_batches_used
Definition: execnodes.h:2404
Tuplesortstate * sort_in
Definition: execnodes.h:2375
TupleTableSlot * hash_spill_wslot
Definition: execnodes.h:2390
AggStatePerAgg curperagg
Definition: execnodes.h:2359
struct HashAggSpill * hash_spills
Definition: execnodes.h:2387
TupleTableSlot * sort_slot
Definition: execnodes.h:2377
bool hash_ever_spilled
Definition: execnodes.h:2392
int numaggs
Definition: execnodes.h:2345
int num_hashes
Definition: execnodes.h:2384
AggStatePerAgg peragg
Definition: execnodes.h:2352
List * hash_batches
Definition: execnodes.h:2391
TupleTableSlot * hash_spill_rslot
Definition: execnodes.h:2389
int maxsets
Definition: execnodes.h:2373
ExprContext ** aggcontexts
Definition: execnodes.h:2355
Bitmapset * colnos_needed
Definition: execnodes.h:2369
int current_phase
Definition: execnodes.h:2351
bool all_cols_needed
Definition: execnodes.h:2371
bool agg_done
Definition: execnodes.h:2363
Bitmapset * grouped_cols
Definition: execnodes.h:2367
Definition: plannodes.h:988
AggSplit aggsplit
Definition: plannodes.h:995
List * chain
Definition: plannodes.h:1022
long numGroups
Definition: plannodes.h:1008
List * groupingSets
Definition: plannodes.h:1019
Bitmapset * aggParams
Definition: plannodes.h:1014
Plan plan
Definition: plannodes.h:989
int numCols
Definition: plannodes.h:998
uint64 transitionSpace
Definition: plannodes.h:1011
AggStrategy aggstrategy
Definition: plannodes.h:992
Oid aggfnoid
Definition: primnodes.h:373
List * aggdistinct
Definition: primnodes.h:403
List * aggdirectargs
Definition: primnodes.h:394
bool aggvariadic
Definition: primnodes.h:415
char aggkind
Definition: primnodes.h:418
int aggtransno
Definition: primnodes.h:433
Index agglevelsup
Definition: primnodes.h:424
List * args
Definition: primnodes.h:397
Expr * aggfilter
Definition: primnodes.h:406
Oid inputcollid
Definition: primnodes.h:382
Oid aggtype
Definition: primnodes.h:376
List * aggorder
Definition: primnodes.h:400
AggSplit aggsplit
Definition: primnodes.h:427
int aggno
Definition: primnodes.h:430
MemoryContext es_query_cxt
Definition: execnodes.h:647
List * es_tupleTable
Definition: execnodes.h:649
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:255
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:249
Datum * ecxt_aggvalues
Definition: execnodes.h:266
bool * ecxt_aggnulls
Definition: execnodes.h:268
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:251
Bitmapset * aggregated
Definition: nodeAgg.c:364
Bitmapset * unaggregated
Definition: nodeAgg.c:365
bool is_aggref
Definition: nodeAgg.c:363
bool fn_strict
Definition: fmgr.h:61
fmNodePtr context
Definition: fmgr.h:88
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
int used_bits
Definition: nodeAgg.c:354
int64 input_tuples
Definition: nodeAgg.c:356
double input_card
Definition: nodeAgg.c:357
int setno
Definition: