PostgreSQL Source Code  git master
nodeAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  * Routines to handle aggregate nodes.
5  *
6  * ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  * transvalue = initcond
9  * foreach input_tuple do
10  * transvalue = transfunc(transvalue, input_value(s))
11  * result = finalfunc(transvalue, direct_argument(s))
12  *
13  * If a finalfunc is not supplied then the result is just the ending
14  * value of transvalue.
15  *
16  * Other behaviors can be selected by the "aggsplit" mode, which exists
17  * to support partial aggregation. It is possible to:
18  * * Skip running the finalfunc, so that the output is always the
19  * final transvalue state.
20  * * Substitute the combinefunc for the transfunc, so that transvalue
21  * states (propagated up from a child partial-aggregation step) are merged
22  * rather than processing raw input rows. (The statements below about
23  * the transfunc apply equally to the combinefunc, when it's selected.)
24  * * Apply the serializefunc to the output values (this only makes sense
25  * when skipping the finalfunc, since the serializefunc works on the
26  * transvalue data type).
27  * * Apply the deserializefunc to the input values (this only makes sense
28  * when using the combinefunc, for similar reasons).
29  * It is the planner's responsibility to connect up Agg nodes using these
30  * alternate behaviors in a way that makes sense, with partial aggregation
31  * results being fed to nodes that expect them.
32  *
33  * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  * input tuples and eliminate duplicates (if required) before performing
35  * the above-depicted process. (However, we don't do that for ordered-set
36  * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  * so far as this module is concerned.) Note that partial aggregation
38  * is not supported in these cases, since we couldn't ensure global
39  * ordering or distinctness of the inputs.
40  *
41  * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  * then the first non-NULL input_value is assigned directly to transvalue,
43  * and transfunc isn't applied until the second non-NULL input_value.
44  * The agg's first input type and transtype must be the same in this case!
45  *
46  * If transfunc is marked "strict" then NULL input_values are skipped,
47  * keeping the previous transvalue. If transfunc is not strict then it
48  * is called for every input tuple and must deal with NULL initcond
49  * or NULL input_values for itself.
50  *
51  * If finalfunc is marked "strict" then it is not called when the
52  * ending transvalue is NULL, instead a NULL result is created
53  * automatically (this is just the usual handling of strict functions,
54  * of course). A non-strict finalfunc can make its own choice of
55  * what to return for a NULL ending transvalue.
56  *
57  * Ordered-set aggregates are treated specially in one other way: we
58  * evaluate any "direct" arguments and pass them to the finalfunc along
59  * with the transition value.
60  *
61  * A finalfunc can have additional arguments beyond the transvalue and
62  * any "direct" arguments, corresponding to the input arguments of the
63  * aggregate. These are always just passed as NULL. Such arguments may be
64  * needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  * We compute aggregate input expressions and run the transition functions
67  * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68  * once per input tuple, so when the transvalue datatype is
69  * pass-by-reference, we have to be careful to copy it into a longer-lived
70  * memory context, and free the prior value to avoid memory leakage. We
71  * store transvalues in another set of econtexts, aggstate->aggcontexts
72  * (one per grouping set, see below), which are also used for the hashtable
73  * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74  * reset, at group boundaries so that aggregate transition functions can
75  * register shutdown callbacks via AggRegisterCallback.
76  *
77  * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  * run finalize functions and compute the output tuple; this context can be
79  * reset once per output tuple.
80  *
81  * The executor's AggState node is passed as the fmgr "context" value in
82  * all transfunc and finalfunc calls. It is not recommended that the
83  * transition functions look at the AggState node directly, but they can
84  * use AggCheckCallContext() to verify that they are being called by
85  * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86  * transition function might want to know this is so that it can avoid
87  * palloc'ing a fixed-size pass-by-ref transition value on every call:
88  * it can instead just scribble on and return its left input. Ordinarily
89  * it is completely forbidden for functions to modify pass-by-ref inputs,
90  * but in the aggregate case we know the left input is either the initial
91  * transition value or a previous function result, and in either case its
92  * value need not be preserved. See int8inc() for an example. Notice that
93  * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  * the previous transition value pointer is returned. It is also possible
95  * to avoid repeated data copying when the transition value is an expanded
96  * object: to do that, the transition function must take care to return
97  * an expanded object that is in a child context of the memory context
98  * returned by AggCheckCallContext(). Also, some transition functions want
99  * to store working state in addition to the nominal transition value; they
100  * can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103  * AggState is available as context in earlier releases (back to 8.1),
104  * but direct examination of the node is needed to use it before 9.0.
105  *
106  * As of 9.4, aggregate transition functions can also use AggGetAggref()
107  * to get hold of the Aggref expression node for their aggregate call.
108  * This is mainly intended for ordered-set aggregates, which are not
109  * supported as window functions. (A regular aggregate function would
110  * need some fallback logic to use this, since there's no Aggref node
111  * for a window function.)
112  *
113  * Grouping sets:
114  *
115  * A list of grouping sets which is structurally equivalent to a ROLLUP
116  * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  * ordered data. We do this by keeping a separate set of transition values
118  * for each grouping set being concurrently processed; for each input tuple
119  * we update them all, and on group boundaries we reset those states
120  * (starting at the front of the list) whose grouping values have changed
121  * (the list of grouping sets is ordered from most specific to least
122  * specific).
123  *
124  * Where more complex grouping sets are used, we break them down into
125  * "phases", where each phase has a different sort order (except phase 0
126  * which is reserved for hashing). During each phase but the last, the
127  * input tuples are additionally stored in a tuplesort which is keyed to the
128  * next phase's sort order; during each phase but the first, the input
129  * tuples are drawn from the previously sorted data. (The sorting of the
130  * data for the first phase is handled by the planner, as it might be
131  * satisfied by underlying nodes.)
132  *
133  * Hashing can be mixed with sorted grouping. To do this, we have an
134  * AGG_MIXED strategy that populates the hashtables during the first sorted
135  * phase, and switches to reading them out after completing all sort phases.
136  * We can also support AGG_HASHED with multiple hash tables and no sorting
137  * at all.
138  *
139  * From the perspective of aggregate transition and final functions, the
140  * only issue regarding grouping sets is this: a single call site (flinfo)
141  * of an aggregate function may be used for updating several different
142  * transition values in turn. So the function must not cache in the flinfo
143  * anything which logically belongs as part of the transition value (most
144  * importantly, the memory context in which the transition value exists).
145  * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  * sensitive to the grouping set for which the aggregate function is
147  * currently being called.
148  *
149  * Plan structure:
150  *
151  * What we get from the planner is actually one "real" Agg node which is
152  * part of the plan tree proper, but which optionally has an additional list
153  * of Agg nodes hung off the side via the "chain" field. This is because an
154  * Agg node happens to be a convenient representation of all the data we
155  * need for grouping sets.
156  *
157  * For many purposes, we treat the "real" node as if it were just the first
158  * node in the chain. The chain must be ordered such that hashed entries
159  * come before sorted/plain entries; the real node is marked AGG_MIXED if
160  * there are both types present (in which case the real node describes one
161  * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163  * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  * chained nodes.
166  *
167  * We collect all hashed nodes into a single "phase", numbered 0, and create
168  * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  * Phase 0 is allocated even if there are no hashes, but remains unused in
170  * that case.
171  *
172  * AGG_HASHED nodes actually refer to only a single grouping set each,
173  * because for each hashed grouping we need a separate grpColIdx and
174  * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175  * grouping sets that share a sort order. Each AGG_SORTED node other than
176  * the first one has an associated Sort node which describes the sort order
177  * to be used; the first sorted node takes its input from the outer subtree,
178  * which the planner has already arranged to provide ordered data.
179  *
180  * Memory and ExprContext usage:
181  *
182  * Because we're accumulating aggregate values across input rows, we need to
183  * use more memory contexts than just simple input/output tuple contexts.
184  * In fact, for a rollup, we need a separate context for each grouping set
185  * so that we can reset the inner (finer-grained) aggregates on their group
186  * boundaries while continuing to accumulate values for outer
187  * (coarser-grained) groupings. On top of this, we might be simultaneously
188  * populating hashtables; however, we only need one context for all the
189  * hashtables.
190  *
191  * So we create an array, aggcontexts, with an ExprContext for each grouping
192  * set in the largest rollup that we're going to process, and use the
193  * per-tuple memory context of those ExprContexts to store the aggregate
194  * transition values. hashcontext is the single context created to support
195  * all hash tables.
196  *
197  * Spilling To Disk
198  *
199  * When performing hash aggregation, if the hash table memory exceeds the
200  * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  * mode, we advance the transition states only for groups already in the
202  * hash table. For tuples that would need to create a new hash table
203  * entries (and initialize new transition states), we instead spill them to
204  * disk to be processed later. The tuples are spilled in a partitioned
205  * manner, so that subsequent batches are smaller and less likely to exceed
206  * work_mem (if a batch does exceed work_mem, it must be spilled
207  * recursively).
208  *
209  * Spilled data is written to logical tapes. These provide better control
210  * over memory usage, disk space, and the number of files than if we were
211  * to use a BufFile for each spill.
212  *
213  * Note that it's possible for transition states to start small but then
214  * grow very large; for instance in the case of ARRAY_AGG. In such cases,
215  * it's still possible to significantly exceed work_mem. We try to avoid
216  * this situation by estimating what will fit in the available memory, and
217  * imposing a limit on the number of groups separately from the amount of
218  * memory consumed.
219  *
220  * Transition / Combine function invocation:
221  *
222  * For performance reasons transition functions, including combine
223  * functions, aren't invoked one-by-one from nodeAgg.c after computing
224  * arguments using the expression evaluation engine. Instead
225  * ExecBuildAggTrans() builds one large expression that does both argument
226  * evaluation and transition function invocation. That avoids performance
227  * issues due to repeated uses of expression evaluation, complications due
228  * to filter expressions having to be evaluated early, and allows to JIT
229  * the entire expression into one native function.
230  *
231  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
232  * Portions Copyright (c) 1994, Regents of the University of California
233  *
234  * IDENTIFICATION
235  * src/backend/executor/nodeAgg.c
236  *
237  *-------------------------------------------------------------------------
238  */
239 
240 #include "postgres.h"
241 
242 #include "access/htup_details.h"
243 #include "catalog/objectaccess.h"
244 #include "catalog/pg_aggregate.h"
245 #include "catalog/pg_proc.h"
246 #include "catalog/pg_type.h"
247 #include "executor/execExpr.h"
248 #include "executor/executor.h"
249 #include "executor/nodeAgg.h"
250 #include "miscadmin.h"
251 #include "nodes/makefuncs.h"
252 #include "nodes/nodeFuncs.h"
253 #include "optimizer/optimizer.h"
254 #include "parser/parse_agg.h"
255 #include "parser/parse_coerce.h"
256 #include "utils/acl.h"
257 #include "utils/builtins.h"
258 #include "utils/datum.h"
259 #include "utils/dynahash.h"
260 #include "utils/expandeddatum.h"
261 #include "utils/logtape.h"
262 #include "utils/lsyscache.h"
263 #include "utils/memutils.h"
264 #include "utils/syscache.h"
265 #include "utils/tuplesort.h"
266 
267 /*
268  * Control how many partitions are created when spilling HashAgg to
269  * disk.
270  *
271  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
272  * partitions needed such that each partition will fit in memory. The factor
273  * is set higher than one because there's not a high cost to having a few too
274  * many partitions, and it makes it less likely that a partition will need to
275  * be spilled recursively. Another benefit of having more, smaller partitions
276  * is that small hash tables may perform better than large ones due to memory
277  * caching effects.
278  *
279  * We also specify a min and max number of partitions per spill. Too few might
280  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
281  * many will result in lots of memory wasted buffering the spill files (which
282  * could instead be spent on a larger hash table).
283  */
284 #define HASHAGG_PARTITION_FACTOR 1.50
285 #define HASHAGG_MIN_PARTITIONS 4
286 #define HASHAGG_MAX_PARTITIONS 1024
287 
288 /*
289  * For reading from tapes, the buffer size must be a multiple of
290  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
291  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
292  * tape always uses a buffer of size BLCKSZ.
293  */
294 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
295 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
296 
297 /* minimum number of initial hash table buckets */
298 #define HASHAGG_MIN_BUCKETS 256
299 
300 /*
301  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
302  * number of tapes needed at the start of the algorithm (because it can
303  * recurse), so one tape set is allocated and extended as needed for new
304  * tapes. When a particular tape is already read, rewind it for write mode and
305  * put it in the free list.
306  *
307  * Tapes' buffers can take up substantial memory when many tapes are open at
308  * once. We only need one tape open at a time in read mode (using a buffer
309  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
310  * requiring a buffer of size BLCKSZ) for each partition.
311  */
312 typedef struct HashTapeInfo
313 {
315  int ntapes;
316  int *freetapes;
319 } HashTapeInfo;
320 
321 /*
322  * Represents partitioned spill data for a single hashtable. Contains the
323  * necessary information to route tuples to the correct partition, and to
324  * transform the spilled data into new batches.
325  *
326  * The high bits are used for partition selection (when recursing, we ignore
327  * the bits that have already been used for partition selection at an earlier
328  * level).
329  */
330 typedef struct HashAggSpill
331 {
332  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
333  int npartitions; /* number of partitions */
334  int *partitions; /* spill partition tape numbers */
335  int64 *ntuples; /* number of tuples in each partition */
336  uint32 mask; /* mask to find partition from hash value */
337  int shift; /* after masking, shift by this amount */
338 } HashAggSpill;
339 
340 /*
341  * Represents work to be done for one pass of hash aggregation (with only one
342  * grouping set).
343  *
344  * Also tracks the bits of the hash already used for partition selection by
345  * earlier iterations, so that this batch can use new bits. If all bits have
346  * already been used, no partitioning will be done (any spilled data will go
347  * to a single output tape).
348  */
349 typedef struct HashAggBatch
350 {
351  int setno; /* grouping set */
352  int used_bits; /* number of bits of hash already used */
353  LogicalTapeSet *tapeset; /* borrowed reference to tape set */
354  int input_tapenum; /* input partition tape */
355  int64 input_tuples; /* number of tuples in this batch */
356 } HashAggBatch;
357 
358 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
359 static void initialize_phase(AggState *aggstate, int newphase);
360 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
361 static void initialize_aggregates(AggState *aggstate,
362  AggStatePerGroup *pergroups,
363  int numReset);
364 static void advance_transition_function(AggState *aggstate,
365  AggStatePerTrans pertrans,
366  AggStatePerGroup pergroupstate);
367 static void advance_aggregates(AggState *aggstate);
368 static void process_ordered_aggregate_single(AggState *aggstate,
369  AggStatePerTrans pertrans,
370  AggStatePerGroup pergroupstate);
371 static void process_ordered_aggregate_multi(AggState *aggstate,
372  AggStatePerTrans pertrans,
373  AggStatePerGroup pergroupstate);
374 static void finalize_aggregate(AggState *aggstate,
375  AggStatePerAgg peragg,
376  AggStatePerGroup pergroupstate,
377  Datum *resultVal, bool *resultIsNull);
378 static void finalize_partialaggregate(AggState *aggstate,
379  AggStatePerAgg peragg,
380  AggStatePerGroup pergroupstate,
381  Datum *resultVal, bool *resultIsNull);
382 static void prepare_hash_slot(AggState *aggstate);
383 static void prepare_projection_slot(AggState *aggstate,
384  TupleTableSlot *slot,
385  int currentSet);
386 static void finalize_aggregates(AggState *aggstate,
387  AggStatePerAgg peragg,
388  AggStatePerGroup pergroup);
389 static TupleTableSlot *project_aggregates(AggState *aggstate);
390 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
391 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
392 static void build_hash_tables(AggState *aggstate);
393 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
394 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
395  bool nullcheck);
396 static long hash_choose_num_buckets(double hashentrysize,
397  long estimated_nbuckets,
398  Size memory);
399 static int hash_choose_num_partitions(uint64 input_groups,
400  double hashentrysize,
401  int used_bits,
402  int *log2_npartittions);
404  bool *in_hash_table);
405 static void lookup_hash_entries(AggState *aggstate);
406 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
407 static void agg_fill_hash_table(AggState *aggstate);
408 static bool agg_refill_hash_table(AggState *aggstate);
411 static void hash_agg_check_limits(AggState *aggstate);
412 static void hash_agg_enter_spill_mode(AggState *aggstate);
413 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
414  int npartitions);
415 static void hashagg_finish_initial_spills(AggState *aggstate);
416 static void hashagg_reset_spill_state(AggState *aggstate);
418  int input_tapenum, int setno,
419  int64 input_tuples, int used_bits);
420 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
421 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
422  int used_bits, uint64 input_tuples,
423  double hashentrysize);
425  uint32 hash);
426 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
427  int setno);
428 static void hashagg_tapeinfo_init(AggState *aggstate);
429 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
430  int ndest);
431 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
432 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
433 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
434  AggState *aggstate, EState *estate,
435  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
436  Oid aggserialfn, Oid aggdeserialfn,
437  Datum initValue, bool initValueIsNull,
438  Oid *inputTypes, int numArguments);
439 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
440  int lastaggno, List **same_input_transnos);
441 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
442  bool shareable,
443  Oid aggtransfn, Oid aggtranstype,
444  Oid aggserialfn, Oid aggdeserialfn,
445  Datum initValue, bool initValueIsNull,
446  List *transnos);
447 
448 
449 /*
450  * Select the current grouping set; affects current_set and
451  * curaggcontext.
452  */
453 static void
454 select_current_set(AggState *aggstate, int setno, bool is_hash)
455 {
456  /*
457  * When changing this, also adapt ExecAggPlainTransByVal() and
458  * ExecAggPlainTransByRef().
459  */
460  if (is_hash)
461  aggstate->curaggcontext = aggstate->hashcontext;
462  else
463  aggstate->curaggcontext = aggstate->aggcontexts[setno];
464 
465  aggstate->current_set = setno;
466 }
467 
468 /*
469  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
470  * current_phase + 1. Juggle the tuplesorts accordingly.
471  *
472  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
473  * case, so when entering phase 0, all we need to do is drop open sorts.
474  */
475 static void
476 initialize_phase(AggState *aggstate, int newphase)
477 {
478  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
479 
480  /*
481  * Whatever the previous state, we're now done with whatever input
482  * tuplesort was in use.
483  */
484  if (aggstate->sort_in)
485  {
486  tuplesort_end(aggstate->sort_in);
487  aggstate->sort_in = NULL;
488  }
489 
490  if (newphase <= 1)
491  {
492  /*
493  * Discard any existing output tuplesort.
494  */
495  if (aggstate->sort_out)
496  {
497  tuplesort_end(aggstate->sort_out);
498  aggstate->sort_out = NULL;
499  }
500  }
501  else
502  {
503  /*
504  * The old output tuplesort becomes the new input one, and this is the
505  * right time to actually sort it.
506  */
507  aggstate->sort_in = aggstate->sort_out;
508  aggstate->sort_out = NULL;
509  Assert(aggstate->sort_in);
510  tuplesort_performsort(aggstate->sort_in);
511  }
512 
513  /*
514  * If this isn't the last phase, we need to sort appropriately for the
515  * next phase in sequence.
516  */
517  if (newphase > 0 && newphase < aggstate->numphases - 1)
518  {
519  Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
520  PlanState *outerNode = outerPlanState(aggstate);
521  TupleDesc tupDesc = ExecGetResultType(outerNode);
522 
523  aggstate->sort_out = tuplesort_begin_heap(tupDesc,
524  sortnode->numCols,
525  sortnode->sortColIdx,
526  sortnode->sortOperators,
527  sortnode->collations,
528  sortnode->nullsFirst,
529  work_mem,
530  NULL, false);
531  }
532 
533  aggstate->current_phase = newphase;
534  aggstate->phase = &aggstate->phases[newphase];
535 }
536 
537 /*
538  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
539  * populated by the previous phase. Copy it to the sorter for the next phase
540  * if any.
541  *
542  * Callers cannot rely on memory for tuple in returned slot remaining valid
543  * past any subsequently fetched tuple.
544  */
545 static TupleTableSlot *
547 {
548  TupleTableSlot *slot;
549 
550  if (aggstate->sort_in)
551  {
552  /* make sure we check for interrupts in either path through here */
554  if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
555  aggstate->sort_slot, NULL))
556  return NULL;
557  slot = aggstate->sort_slot;
558  }
559  else
560  slot = ExecProcNode(outerPlanState(aggstate));
561 
562  if (!TupIsNull(slot) && aggstate->sort_out)
563  tuplesort_puttupleslot(aggstate->sort_out, slot);
564 
565  return slot;
566 }
567 
568 /*
569  * (Re)Initialize an individual aggregate.
570  *
571  * This function handles only one grouping set, already set in
572  * aggstate->current_set.
573  *
574  * When called, CurrentMemoryContext should be the per-query context.
575  */
576 static void
578  AggStatePerGroup pergroupstate)
579 {
580  /*
581  * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
582  */
583  if (pertrans->numSortCols > 0)
584  {
585  /*
586  * In case of rescan, maybe there could be an uncompleted sort
587  * operation? Clean it up if so.
588  */
589  if (pertrans->sortstates[aggstate->current_set])
590  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
591 
592 
593  /*
594  * We use a plain Datum sorter when there's a single input column;
595  * otherwise sort the full tuple. (See comments for
596  * process_ordered_aggregate_single.)
597  */
598  if (pertrans->numInputs == 1)
599  {
600  Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
601 
602  pertrans->sortstates[aggstate->current_set] =
603  tuplesort_begin_datum(attr->atttypid,
604  pertrans->sortOperators[0],
605  pertrans->sortCollations[0],
606  pertrans->sortNullsFirst[0],
607  work_mem, NULL, false);
608  }
609  else
610  pertrans->sortstates[aggstate->current_set] =
611  tuplesort_begin_heap(pertrans->sortdesc,
612  pertrans->numSortCols,
613  pertrans->sortColIdx,
614  pertrans->sortOperators,
615  pertrans->sortCollations,
616  pertrans->sortNullsFirst,
617  work_mem, NULL, false);
618  }
619 
620  /*
621  * (Re)set transValue to the initial value.
622  *
623  * Note that when the initial value is pass-by-ref, we must copy it (into
624  * the aggcontext) since we will pfree the transValue later.
625  */
626  if (pertrans->initValueIsNull)
627  pergroupstate->transValue = pertrans->initValue;
628  else
629  {
630  MemoryContext oldContext;
631 
633  pergroupstate->transValue = datumCopy(pertrans->initValue,
634  pertrans->transtypeByVal,
635  pertrans->transtypeLen);
636  MemoryContextSwitchTo(oldContext);
637  }
638  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
639 
640  /*
641  * If the initial value for the transition state doesn't exist in the
642  * pg_aggregate table then we will let the first non-NULL value returned
643  * from the outer procNode become the initial value. (This is useful for
644  * aggregates like max() and min().) The noTransValue flag signals that we
645  * still need to do this.
646  */
647  pergroupstate->noTransValue = pertrans->initValueIsNull;
648 }
649 
650 /*
651  * Initialize all aggregate transition states for a new group of input values.
652  *
653  * If there are multiple grouping sets, we initialize only the first numReset
654  * of them (the grouping sets are ordered so that the most specific one, which
655  * is reset most often, is first). As a convenience, if numReset is 0, we
656  * reinitialize all sets.
657  *
658  * NB: This cannot be used for hash aggregates, as for those the grouping set
659  * number has to be specified from further up.
660  *
661  * When called, CurrentMemoryContext should be the per-query context.
662  */
663 static void
665  AggStatePerGroup *pergroups,
666  int numReset)
667 {
668  int transno;
669  int numGroupingSets = Max(aggstate->phase->numsets, 1);
670  int setno = 0;
671  int numTrans = aggstate->numtrans;
672  AggStatePerTrans transstates = aggstate->pertrans;
673 
674  if (numReset == 0)
675  numReset = numGroupingSets;
676 
677  for (setno = 0; setno < numReset; setno++)
678  {
679  AggStatePerGroup pergroup = pergroups[setno];
680 
681  select_current_set(aggstate, setno, false);
682 
683  for (transno = 0; transno < numTrans; transno++)
684  {
685  AggStatePerTrans pertrans = &transstates[transno];
686  AggStatePerGroup pergroupstate = &pergroup[transno];
687 
688  initialize_aggregate(aggstate, pertrans, pergroupstate);
689  }
690  }
691 }
692 
693 /*
694  * Given new input value(s), advance the transition function of one aggregate
695  * state within one grouping set only (already set in aggstate->current_set)
696  *
697  * The new values (and null flags) have been preloaded into argument positions
698  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
699  * pass to the transition function. We also expect that the static fields of
700  * the fcinfo are already initialized; that was done by ExecInitAgg().
701  *
702  * It doesn't matter which memory context this is called in.
703  */
704 static void
706  AggStatePerTrans pertrans,
707  AggStatePerGroup pergroupstate)
708 {
709  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
710  MemoryContext oldContext;
711  Datum newVal;
712 
713  if (pertrans->transfn.fn_strict)
714  {
715  /*
716  * For a strict transfn, nothing happens when there's a NULL input; we
717  * just keep the prior transValue.
718  */
719  int numTransInputs = pertrans->numTransInputs;
720  int i;
721 
722  for (i = 1; i <= numTransInputs; i++)
723  {
724  if (fcinfo->args[i].isnull)
725  return;
726  }
727  if (pergroupstate->noTransValue)
728  {
729  /*
730  * transValue has not been initialized. This is the first non-NULL
731  * input value. We use it as the initial value for transValue. (We
732  * already checked that the agg's input type is binary-compatible
733  * with its transtype, so straight copy here is OK.)
734  *
735  * We must copy the datum into aggcontext if it is pass-by-ref. We
736  * do not need to pfree the old transValue, since it's NULL.
737  */
739  pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
740  pertrans->transtypeByVal,
741  pertrans->transtypeLen);
742  pergroupstate->transValueIsNull = false;
743  pergroupstate->noTransValue = false;
744  MemoryContextSwitchTo(oldContext);
745  return;
746  }
747  if (pergroupstate->transValueIsNull)
748  {
749  /*
750  * Don't call a strict function with NULL inputs. Note it is
751  * possible to get here despite the above tests, if the transfn is
752  * strict *and* returned a NULL on a prior cycle. If that happens
753  * we will propagate the NULL all the way to the end.
754  */
755  return;
756  }
757  }
758 
759  /* We run the transition functions in per-input-tuple memory context */
760  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
761 
762  /* set up aggstate->curpertrans for AggGetAggref() */
763  aggstate->curpertrans = pertrans;
764 
765  /*
766  * OK to call the transition function
767  */
768  fcinfo->args[0].value = pergroupstate->transValue;
769  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
770  fcinfo->isnull = false; /* just in case transfn doesn't set it */
771 
772  newVal = FunctionCallInvoke(fcinfo);
773 
774  aggstate->curpertrans = NULL;
775 
776  /*
777  * If pass-by-ref datatype, must copy the new value into aggcontext and
778  * free the prior transValue. But if transfn returned a pointer to its
779  * first input, we don't need to do anything. Also, if transfn returned a
780  * pointer to a R/W expanded object that is already a child of the
781  * aggcontext, assume we can adopt that value without copying it.
782  *
783  * It's safe to compare newVal with pergroup->transValue without
784  * regard for either being NULL, because ExecAggTransReparent()
785  * takes care to set transValue to 0 when NULL. Otherwise we could
786  * end up accidentally not reparenting, when the transValue has
787  * the same numerical value as newValue, despite being NULL. This
788  * is a somewhat hot path, making it undesirable to instead solve
789  * this with another branch for the common case of the transition
790  * function returning its (modified) input argument.
791  */
792  if (!pertrans->transtypeByVal &&
793  DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
794  newVal = ExecAggTransReparent(aggstate, pertrans,
795  newVal, fcinfo->isnull,
796  pergroupstate->transValue,
797  pergroupstate->transValueIsNull);
798 
799  pergroupstate->transValue = newVal;
800  pergroupstate->transValueIsNull = fcinfo->isnull;
801 
802  MemoryContextSwitchTo(oldContext);
803 }
804 
805 /*
806  * Advance each aggregate transition state for one input tuple. The input
807  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
808  * accessible to ExecEvalExpr.
809  *
810  * We have two sets of transition states to handle: one for sorted aggregation
811  * and one for hashed; we do them both here, to avoid multiple evaluation of
812  * the inputs.
813  *
814  * When called, CurrentMemoryContext should be the per-query context.
815  */
816 static void
818 {
819  bool dummynull;
820 
822  aggstate->tmpcontext,
823  &dummynull);
824 }
825 
826 /*
827  * Run the transition function for a DISTINCT or ORDER BY aggregate
828  * with only one input. This is called after we have completed
829  * entering all the input values into the sort object. We complete the
830  * sort, read out the values in sorted order, and run the transition
831  * function on each value (applying DISTINCT if appropriate).
832  *
833  * Note that the strictness of the transition function was checked when
834  * entering the values into the sort, so we don't check it again here;
835  * we just apply standard SQL DISTINCT logic.
836  *
837  * The one-input case is handled separately from the multi-input case
838  * for performance reasons: for single by-value inputs, such as the
839  * common case of count(distinct id), the tuplesort_getdatum code path
840  * is around 300% faster. (The speedup for by-reference types is less
841  * but still noticeable.)
842  *
843  * This function handles only one grouping set (already set in
844  * aggstate->current_set).
845  *
846  * When called, CurrentMemoryContext should be the per-query context.
847  */
848 static void
850  AggStatePerTrans pertrans,
851  AggStatePerGroup pergroupstate)
852 {
853  Datum oldVal = (Datum) 0;
854  bool oldIsNull = true;
855  bool haveOldVal = false;
856  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
857  MemoryContext oldContext;
858  bool isDistinct = (pertrans->numDistinctCols > 0);
859  Datum newAbbrevVal = (Datum) 0;
860  Datum oldAbbrevVal = (Datum) 0;
861  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
862  Datum *newVal;
863  bool *isNull;
864 
865  Assert(pertrans->numDistinctCols < 2);
866 
867  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
868 
869  /* Load the column into argument 1 (arg 0 will be transition value) */
870  newVal = &fcinfo->args[1].value;
871  isNull = &fcinfo->args[1].isnull;
872 
873  /*
874  * Note: if input type is pass-by-ref, the datums returned by the sort are
875  * freshly palloc'd in the per-query context, so we must be careful to
876  * pfree them when they are no longer needed.
877  */
878 
879  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
880  true, newVal, isNull, &newAbbrevVal))
881  {
882  /*
883  * Clear and select the working context for evaluation of the equality
884  * function and transition function.
885  */
886  MemoryContextReset(workcontext);
887  oldContext = MemoryContextSwitchTo(workcontext);
888 
889  /*
890  * If DISTINCT mode, and not distinct from prior, skip it.
891  */
892  if (isDistinct &&
893  haveOldVal &&
894  ((oldIsNull && *isNull) ||
895  (!oldIsNull && !*isNull &&
896  oldAbbrevVal == newAbbrevVal &&
898  pertrans->aggCollation,
899  oldVal, *newVal)))))
900  {
901  /* equal to prior, so forget this one */
902  if (!pertrans->inputtypeByVal && !*isNull)
903  pfree(DatumGetPointer(*newVal));
904  }
905  else
906  {
907  advance_transition_function(aggstate, pertrans, pergroupstate);
908  /* forget the old value, if any */
909  if (!oldIsNull && !pertrans->inputtypeByVal)
910  pfree(DatumGetPointer(oldVal));
911  /* and remember the new one for subsequent equality checks */
912  oldVal = *newVal;
913  oldAbbrevVal = newAbbrevVal;
914  oldIsNull = *isNull;
915  haveOldVal = true;
916  }
917 
918  MemoryContextSwitchTo(oldContext);
919  }
920 
921  if (!oldIsNull && !pertrans->inputtypeByVal)
922  pfree(DatumGetPointer(oldVal));
923 
924  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
925  pertrans->sortstates[aggstate->current_set] = NULL;
926 }
927 
928 /*
929  * Run the transition function for a DISTINCT or ORDER BY aggregate
930  * with more than one input. This is called after we have completed
931  * entering all the input values into the sort object. We complete the
932  * sort, read out the values in sorted order, and run the transition
933  * function on each value (applying DISTINCT if appropriate).
934  *
935  * This function handles only one grouping set (already set in
936  * aggstate->current_set).
937  *
938  * When called, CurrentMemoryContext should be the per-query context.
939  */
940 static void
942  AggStatePerTrans pertrans,
943  AggStatePerGroup pergroupstate)
944 {
945  ExprContext *tmpcontext = aggstate->tmpcontext;
946  FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
947  TupleTableSlot *slot1 = pertrans->sortslot;
948  TupleTableSlot *slot2 = pertrans->uniqslot;
949  int numTransInputs = pertrans->numTransInputs;
950  int numDistinctCols = pertrans->numDistinctCols;
951  Datum newAbbrevVal = (Datum) 0;
952  Datum oldAbbrevVal = (Datum) 0;
953  bool haveOldValue = false;
954  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
955  int i;
956 
957  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
958 
959  ExecClearTuple(slot1);
960  if (slot2)
961  ExecClearTuple(slot2);
962 
963  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
964  true, true, slot1, &newAbbrevVal))
965  {
967 
968  tmpcontext->ecxt_outertuple = slot1;
969  tmpcontext->ecxt_innertuple = slot2;
970 
971  if (numDistinctCols == 0 ||
972  !haveOldValue ||
973  newAbbrevVal != oldAbbrevVal ||
974  !ExecQual(pertrans->equalfnMulti, tmpcontext))
975  {
976  /*
977  * Extract the first numTransInputs columns as datums to pass to
978  * the transfn.
979  */
980  slot_getsomeattrs(slot1, numTransInputs);
981 
982  /* Load values into fcinfo */
983  /* Start from 1, since the 0th arg will be the transition value */
984  for (i = 0; i < numTransInputs; i++)
985  {
986  fcinfo->args[i + 1].value = slot1->tts_values[i];
987  fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
988  }
989 
990  advance_transition_function(aggstate, pertrans, pergroupstate);
991 
992  if (numDistinctCols > 0)
993  {
994  /* swap the slot pointers to retain the current tuple */
995  TupleTableSlot *tmpslot = slot2;
996 
997  slot2 = slot1;
998  slot1 = tmpslot;
999  /* avoid ExecQual() calls by reusing abbreviated keys */
1000  oldAbbrevVal = newAbbrevVal;
1001  haveOldValue = true;
1002  }
1003  }
1004 
1005  /* Reset context each time */
1006  ResetExprContext(tmpcontext);
1007 
1008  ExecClearTuple(slot1);
1009  }
1010 
1011  if (slot2)
1012  ExecClearTuple(slot2);
1013 
1014  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1015  pertrans->sortstates[aggstate->current_set] = NULL;
1016 
1017  /* restore previous slot, potentially in use for grouping sets */
1018  tmpcontext->ecxt_outertuple = save;
1019 }
1020 
1021 /*
1022  * Compute the final value of one aggregate.
1023  *
1024  * This function handles only one grouping set (already set in
1025  * aggstate->current_set).
1026  *
1027  * The finalfn will be run, and the result delivered, in the
1028  * output-tuple context; caller's CurrentMemoryContext does not matter.
1029  *
1030  * The finalfn uses the state as set in the transno. This also might be
1031  * being used by another aggregate function, so it's important that we do
1032  * nothing destructive here.
1033  */
1034 static void
1036  AggStatePerAgg peragg,
1037  AggStatePerGroup pergroupstate,
1038  Datum *resultVal, bool *resultIsNull)
1039 {
1040  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1041  bool anynull = false;
1042  MemoryContext oldContext;
1043  int i;
1044  ListCell *lc;
1045  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1046 
1048 
1049  /*
1050  * Evaluate any direct arguments. We do this even if there's no finalfn
1051  * (which is unlikely anyway), so that side-effects happen as expected.
1052  * The direct arguments go into arg positions 1 and up, leaving position 0
1053  * for the transition state value.
1054  */
1055  i = 1;
1056  foreach(lc, peragg->aggdirectargs)
1057  {
1058  ExprState *expr = (ExprState *) lfirst(lc);
1059 
1060  fcinfo->args[i].value = ExecEvalExpr(expr,
1061  aggstate->ss.ps.ps_ExprContext,
1062  &fcinfo->args[i].isnull);
1063  anynull |= fcinfo->args[i].isnull;
1064  i++;
1065  }
1066 
1067  /*
1068  * Apply the agg's finalfn if one is provided, else return transValue.
1069  */
1070  if (OidIsValid(peragg->finalfn_oid))
1071  {
1072  int numFinalArgs = peragg->numFinalArgs;
1073 
1074  /* set up aggstate->curperagg for AggGetAggref() */
1075  aggstate->curperagg = peragg;
1076 
1077  InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1078  numFinalArgs,
1079  pertrans->aggCollation,
1080  (void *) aggstate, NULL);
1081 
1082  /* Fill in the transition state value */
1083  fcinfo->args[0].value =
1084  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1085  pergroupstate->transValueIsNull,
1086  pertrans->transtypeLen);
1087  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1088  anynull |= pergroupstate->transValueIsNull;
1089 
1090  /* Fill any remaining argument positions with nulls */
1091  for (; i < numFinalArgs; i++)
1092  {
1093  fcinfo->args[i].value = (Datum) 0;
1094  fcinfo->args[i].isnull = true;
1095  anynull = true;
1096  }
1097 
1098  if (fcinfo->flinfo->fn_strict && anynull)
1099  {
1100  /* don't call a strict function with NULL inputs */
1101  *resultVal = (Datum) 0;
1102  *resultIsNull = true;
1103  }
1104  else
1105  {
1106  *resultVal = FunctionCallInvoke(fcinfo);
1107  *resultIsNull = fcinfo->isnull;
1108  }
1109  aggstate->curperagg = NULL;
1110  }
1111  else
1112  {
1113  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1114  *resultVal = pergroupstate->transValue;
1115  *resultIsNull = pergroupstate->transValueIsNull;
1116  }
1117 
1118  /*
1119  * If result is pass-by-ref, make sure it is in the right context.
1120  */
1121  if (!peragg->resulttypeByVal && !*resultIsNull &&
1123  DatumGetPointer(*resultVal)))
1124  *resultVal = datumCopy(*resultVal,
1125  peragg->resulttypeByVal,
1126  peragg->resulttypeLen);
1127 
1128  MemoryContextSwitchTo(oldContext);
1129 }
1130 
1131 /*
1132  * Compute the output value of one partial aggregate.
1133  *
1134  * The serialization function will be run, and the result delivered, in the
1135  * output-tuple context; caller's CurrentMemoryContext does not matter.
1136  */
1137 static void
1139  AggStatePerAgg peragg,
1140  AggStatePerGroup pergroupstate,
1141  Datum *resultVal, bool *resultIsNull)
1142 {
1143  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1144  MemoryContext oldContext;
1145 
1147 
1148  /*
1149  * serialfn_oid will be set if we must serialize the transvalue before
1150  * returning it
1151  */
1152  if (OidIsValid(pertrans->serialfn_oid))
1153  {
1154  /* Don't call a strict serialization function with NULL input. */
1155  if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1156  {
1157  *resultVal = (Datum) 0;
1158  *resultIsNull = true;
1159  }
1160  else
1161  {
1162  FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1163 
1164  fcinfo->args[0].value =
1165  MakeExpandedObjectReadOnly(pergroupstate->transValue,
1166  pergroupstate->transValueIsNull,
1167  pertrans->transtypeLen);
1168  fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1169 
1170  *resultVal = FunctionCallInvoke(fcinfo);
1171  *resultIsNull = fcinfo->isnull;
1172  }
1173  }
1174  else
1175  {
1176  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1177  *resultVal = pergroupstate->transValue;
1178  *resultIsNull = pergroupstate->transValueIsNull;
1179  }
1180 
1181  /* If result is pass-by-ref, make sure it is in the right context. */
1182  if (!peragg->resulttypeByVal && !*resultIsNull &&
1184  DatumGetPointer(*resultVal)))
1185  *resultVal = datumCopy(*resultVal,
1186  peragg->resulttypeByVal,
1187  peragg->resulttypeLen);
1188 
1189  MemoryContextSwitchTo(oldContext);
1190 }
1191 
1192 /*
1193  * Extract the attributes that make up the grouping key into the
1194  * hashslot. This is necessary to compute the hash or perform a lookup.
1195  */
1196 static void
1198 {
1199  TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1200  AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1201  TupleTableSlot *hashslot = perhash->hashslot;
1202  int i;
1203 
1204  /* transfer just the needed columns into hashslot */
1205  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1206  ExecClearTuple(hashslot);
1207 
1208  for (i = 0; i < perhash->numhashGrpCols; i++)
1209  {
1210  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1211 
1212  hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1213  hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1214  }
1215  ExecStoreVirtualTuple(hashslot);
1216 }
1217 
1218 /*
1219  * Prepare to finalize and project based on the specified representative tuple
1220  * slot and grouping set.
1221  *
1222  * In the specified tuple slot, force to null all attributes that should be
1223  * read as null in the context of the current grouping set. Also stash the
1224  * current group bitmap where GroupingExpr can get at it.
1225  *
1226  * This relies on three conditions:
1227  *
1228  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1229  * only reference it in evaluations, which will only access individual
1230  * attributes.
1231  *
1232  * 2) No system columns are going to need to be nulled. (If a system column is
1233  * referenced in a group clause, it is actually projected in the outer plan
1234  * tlist.)
1235  *
1236  * 3) Within a given phase, we never need to recover the value of an attribute
1237  * once it has been set to null.
1238  *
1239  * Poking into the slot this way is a bit ugly, but the consensus is that the
1240  * alternative was worse.
1241  */
1242 static void
1243 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1244 {
1245  if (aggstate->phase->grouped_cols)
1246  {
1247  Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1248 
1249  aggstate->grouped_cols = grouped_cols;
1250 
1251  if (TTS_EMPTY(slot))
1252  {
1253  /*
1254  * Force all values to be NULL if working on an empty input tuple
1255  * (i.e. an empty grouping set for which no input rows were
1256  * supplied).
1257  */
1258  ExecStoreAllNullTuple(slot);
1259  }
1260  else if (aggstate->all_grouped_cols)
1261  {
1262  ListCell *lc;
1263 
1264  /* all_grouped_cols is arranged in desc order */
1266 
1267  foreach(lc, aggstate->all_grouped_cols)
1268  {
1269  int attnum = lfirst_int(lc);
1270 
1271  if (!bms_is_member(attnum, grouped_cols))
1272  slot->tts_isnull[attnum - 1] = true;
1273  }
1274  }
1275  }
1276 }
1277 
1278 /*
1279  * Compute the final value of all aggregates for one group.
1280  *
1281  * This function handles only one grouping set at a time, which the caller must
1282  * have selected. It's also the caller's responsibility to adjust the supplied
1283  * pergroup parameter to point to the current set's transvalues.
1284  *
1285  * Results are stored in the output econtext aggvalues/aggnulls.
1286  */
1287 static void
1289  AggStatePerAgg peraggs,
1290  AggStatePerGroup pergroup)
1291 {
1292  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1293  Datum *aggvalues = econtext->ecxt_aggvalues;
1294  bool *aggnulls = econtext->ecxt_aggnulls;
1295  int aggno;
1296  int transno;
1297 
1298  /*
1299  * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1300  * inputs and run the transition functions.
1301  */
1302  for (transno = 0; transno < aggstate->numtrans; transno++)
1303  {
1304  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1305  AggStatePerGroup pergroupstate;
1306 
1307  pergroupstate = &pergroup[transno];
1308 
1309  if (pertrans->numSortCols > 0)
1310  {
1311  Assert(aggstate->aggstrategy != AGG_HASHED &&
1312  aggstate->aggstrategy != AGG_MIXED);
1313 
1314  if (pertrans->numInputs == 1)
1316  pertrans,
1317  pergroupstate);
1318  else
1320  pertrans,
1321  pergroupstate);
1322  }
1323  }
1324 
1325  /*
1326  * Run the final functions.
1327  */
1328  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1329  {
1330  AggStatePerAgg peragg = &peraggs[aggno];
1331  int transno = peragg->transno;
1332  AggStatePerGroup pergroupstate;
1333 
1334  pergroupstate = &pergroup[transno];
1335 
1336  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1337  finalize_partialaggregate(aggstate, peragg, pergroupstate,
1338  &aggvalues[aggno], &aggnulls[aggno]);
1339  else
1340  finalize_aggregate(aggstate, peragg, pergroupstate,
1341  &aggvalues[aggno], &aggnulls[aggno]);
1342  }
1343 }
1344 
1345 /*
1346  * Project the result of a group (whose aggs have already been calculated by
1347  * finalize_aggregates). Returns the result slot, or NULL if no row is
1348  * projected (suppressed by qual).
1349  */
1350 static TupleTableSlot *
1352 {
1353  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1354 
1355  /*
1356  * Check the qual (HAVING clause); if the group does not match, ignore it.
1357  */
1358  if (ExecQual(aggstate->ss.ps.qual, econtext))
1359  {
1360  /*
1361  * Form and return projection tuple using the aggregate results and
1362  * the representative input tuple.
1363  */
1364  return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1365  }
1366  else
1367  InstrCountFiltered1(aggstate, 1);
1368 
1369  return NULL;
1370 }
1371 
1372 /*
1373  * find_unaggregated_cols
1374  * Construct a bitmapset of the column numbers of un-aggregated Vars
1375  * appearing in our targetlist and qual (HAVING clause)
1376  */
1377 static Bitmapset *
1379 {
1380  Agg *node = (Agg *) aggstate->ss.ps.plan;
1381  Bitmapset *colnos;
1382 
1383  colnos = NULL;
1385  &colnos);
1386  (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1387  &colnos);
1388  return colnos;
1389 }
1390 
1391 static bool
1393 {
1394  if (node == NULL)
1395  return false;
1396  if (IsA(node, Var))
1397  {
1398  Var *var = (Var *) node;
1399 
1400  /* setrefs.c should have set the varno to OUTER_VAR */
1401  Assert(var->varno == OUTER_VAR);
1402  Assert(var->varlevelsup == 0);
1403  *colnos = bms_add_member(*colnos, var->varattno);
1404  return false;
1405  }
1406  if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1407  {
1408  /* do not descend into aggregate exprs */
1409  return false;
1410  }
1412  (void *) colnos);
1413 }
1414 
1415 /*
1416  * (Re-)initialize the hash table(s) to empty.
1417  *
1418  * To implement hashed aggregation, we need a hashtable that stores a
1419  * representative tuple and an array of AggStatePerGroup structs for each
1420  * distinct set of GROUP BY column values. We compute the hash key from the
1421  * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1422  * for each entry.
1423  *
1424  * We have a separate hashtable and associated perhash data structure for each
1425  * grouping set for which we're doing hashing.
1426  *
1427  * The contents of the hash tables always live in the hashcontext's per-tuple
1428  * memory context (there is only one of these for all tables together, since
1429  * they are all reset at the same time).
1430  */
1431 static void
1433 {
1434  int setno;
1435 
1436  for (setno = 0; setno < aggstate->num_hashes; ++setno)
1437  {
1438  AggStatePerHash perhash = &aggstate->perhash[setno];
1439  long nbuckets;
1440  Size memory;
1441 
1442  if (perhash->hashtable != NULL)
1443  {
1444  ResetTupleHashTable(perhash->hashtable);
1445  continue;
1446  }
1447 
1448  Assert(perhash->aggnode->numGroups > 0);
1449 
1450  memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1451 
1452  /* choose reasonable number of buckets per hashtable */
1453  nbuckets = hash_choose_num_buckets(
1454  aggstate->hashentrysize, perhash->aggnode->numGroups, memory);
1455 
1456  build_hash_table(aggstate, setno, nbuckets);
1457  }
1458 
1459  aggstate->hash_ngroups_current = 0;
1460 }
1461 
1462 /*
1463  * Build a single hashtable for this grouping set.
1464  */
1465 static void
1466 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1467 {
1468  AggStatePerHash perhash = &aggstate->perhash[setno];
1469  MemoryContext metacxt = aggstate->hash_metacxt;
1470  MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1471  MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1472  Size additionalsize;
1473 
1474  Assert(aggstate->aggstrategy == AGG_HASHED ||
1475  aggstate->aggstrategy == AGG_MIXED);
1476 
1477  /*
1478  * Used to make sure initial hash table allocation does not exceed
1479  * work_mem. Note that the estimate does not include space for
1480  * pass-by-reference transition data values, nor for the representative
1481  * tuple of each group.
1482  */
1483  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1484 
1485  perhash->hashtable = BuildTupleHashTableExt(
1486  &aggstate->ss.ps,
1487  perhash->hashslot->tts_tupleDescriptor,
1488  perhash->numCols,
1489  perhash->hashGrpColIdxHash,
1490  perhash->eqfuncoids,
1491  perhash->hashfunctions,
1492  perhash->aggnode->grpCollations,
1493  nbuckets,
1494  additionalsize,
1495  metacxt,
1496  hashcxt,
1497  tmpcxt,
1498  DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1499 }
1500 
1501 /*
1502  * Compute columns that actually need to be stored in hashtable entries. The
1503  * incoming tuples from the child plan node will contain grouping columns,
1504  * other columns referenced in our targetlist and qual, columns used to
1505  * compute the aggregate functions, and perhaps just junk columns we don't use
1506  * at all. Only columns of the first two types need to be stored in the
1507  * hashtable, and getting rid of the others can make the table entries
1508  * significantly smaller. The hashtable only contains the relevant columns,
1509  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1510  * into the format of the normal input descriptor.
1511  *
1512  * Additional columns, in addition to the columns grouped by, come from two
1513  * sources: Firstly functionally dependent columns that we don't need to group
1514  * by themselves, and secondly ctids for row-marks.
1515  *
1516  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1517  * then build an array of the columns included in the hashtable. We might
1518  * still have duplicates if the passed-in grpColIdx has them, which can happen
1519  * in edge cases from semijoins/distinct; these can't always be removed,
1520  * because it's not certain that the duplicate cols will be using the same
1521  * hash function.
1522  *
1523  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1524  * the per-query context (unlike the hash table itself).
1525  */
1526 static void
1528 {
1529  Bitmapset *base_colnos;
1530  List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1531  int numHashes = aggstate->num_hashes;
1532  EState *estate = aggstate->ss.ps.state;
1533  int j;
1534 
1535  /* Find Vars that will be needed in tlist and qual */
1536  base_colnos = find_unaggregated_cols(aggstate);
1537 
1538  for (j = 0; j < numHashes; ++j)
1539  {
1540  AggStatePerHash perhash = &aggstate->perhash[j];
1541  Bitmapset *colnos = bms_copy(base_colnos);
1542  AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1543  List *hashTlist = NIL;
1544  TupleDesc hashDesc;
1545  int maxCols;
1546  int i;
1547 
1548  perhash->largestGrpColIdx = 0;
1549 
1550  /*
1551  * If we're doing grouping sets, then some Vars might be referenced in
1552  * tlist/qual for the benefit of other grouping sets, but not needed
1553  * when hashing; i.e. prepare_projection_slot will null them out, so
1554  * there'd be no point storing them. Use prepare_projection_slot's
1555  * logic to determine which.
1556  */
1557  if (aggstate->phases[0].grouped_cols)
1558  {
1559  Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1560  ListCell *lc;
1561 
1562  foreach(lc, aggstate->all_grouped_cols)
1563  {
1564  int attnum = lfirst_int(lc);
1565 
1566  if (!bms_is_member(attnum, grouped_cols))
1567  colnos = bms_del_member(colnos, attnum);
1568  }
1569  }
1570 
1571  /*
1572  * Compute maximum number of input columns accounting for possible
1573  * duplications in the grpColIdx array, which can happen in some edge
1574  * cases where HashAggregate was generated as part of a semijoin or a
1575  * DISTINCT.
1576  */
1577  maxCols = bms_num_members(colnos) + perhash->numCols;
1578 
1579  perhash->hashGrpColIdxInput =
1580  palloc(maxCols * sizeof(AttrNumber));
1581  perhash->hashGrpColIdxHash =
1582  palloc(perhash->numCols * sizeof(AttrNumber));
1583 
1584  /* Add all the grouping columns to colnos */
1585  for (i = 0; i < perhash->numCols; i++)
1586  colnos = bms_add_member(colnos, grpColIdx[i]);
1587 
1588  /*
1589  * First build mapping for columns directly hashed. These are the
1590  * first, because they'll be accessed when computing hash values and
1591  * comparing tuples for exact matches. We also build simple mapping
1592  * for execGrouping, so it knows where to find the to-be-hashed /
1593  * compared columns in the input.
1594  */
1595  for (i = 0; i < perhash->numCols; i++)
1596  {
1597  perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1598  perhash->hashGrpColIdxHash[i] = i + 1;
1599  perhash->numhashGrpCols++;
1600  /* delete already mapped columns */
1601  bms_del_member(colnos, grpColIdx[i]);
1602  }
1603 
1604  /* and add the remaining columns */
1605  while ((i = bms_first_member(colnos)) >= 0)
1606  {
1607  perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1608  perhash->numhashGrpCols++;
1609  }
1610 
1611  /* and build a tuple descriptor for the hashtable */
1612  for (i = 0; i < perhash->numhashGrpCols; i++)
1613  {
1614  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1615 
1616  hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1617  perhash->largestGrpColIdx =
1618  Max(varNumber + 1, perhash->largestGrpColIdx);
1619  }
1620 
1621  hashDesc = ExecTypeFromTL(hashTlist);
1622 
1623  execTuplesHashPrepare(perhash->numCols,
1624  perhash->aggnode->grpOperators,
1625  &perhash->eqfuncoids,
1626  &perhash->hashfunctions);
1627  perhash->hashslot =
1628  ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1630 
1631  list_free(hashTlist);
1632  bms_free(colnos);
1633  }
1634 
1635  bms_free(base_colnos);
1636 }
1637 
1638 /*
1639  * Estimate per-hash-table-entry overhead.
1640  */
1641 Size
1642 hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace)
1643 {
1644  return
1646  MAXALIGN(tupleWidth) +
1647  MAXALIGN(sizeof(TupleHashEntryData) +
1648  numAggs * sizeof(AggStatePerGroupData)) +
1649  transitionSpace;
1650 }
1651 
1652 /*
1653  * hashagg_recompile_expressions()
1654  *
1655  * Identifies the right phase, compiles the right expression given the
1656  * arguments, and then sets phase->evalfunc to that expression.
1657  *
1658  * Different versions of the compiled expression are needed depending on
1659  * whether hash aggregation has spilled or not, and whether it's reading from
1660  * the outer plan or a tape. Before spilling to disk, the expression reads
1661  * from the outer plan and does not need to perform a NULL check. After
1662  * HashAgg begins to spill, new groups will not be created in the hash table,
1663  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1664  * pointer check to the expression. Then, when reading spilled data from a
1665  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1666  *
1667  * It would be wasteful to recompile every time, so cache the compiled
1668  * expressions in the AggStatePerPhase, and reuse when appropriate.
1669  */
1670 static void
1671 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1672 {
1673  AggStatePerPhase phase;
1674  int i = minslot ? 1 : 0;
1675  int j = nullcheck ? 1 : 0;
1676 
1677  Assert(aggstate->aggstrategy == AGG_HASHED ||
1678  aggstate->aggstrategy == AGG_MIXED);
1679 
1680  if (aggstate->aggstrategy == AGG_HASHED)
1681  phase = &aggstate->phases[0];
1682  else /* AGG_MIXED */
1683  phase = &aggstate->phases[1];
1684 
1685  if (phase->evaltrans_cache[i][j] == NULL)
1686  {
1687  const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1688  bool outerfixed = aggstate->ss.ps.outeropsfixed;
1689  bool dohash = true;
1690  bool dosort;
1691 
1692  dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
1693 
1694  /* temporarily change the outerops while compiling the expression */
1695  if (minslot)
1696  {
1697  aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1698  aggstate->ss.ps.outeropsfixed = true;
1699  }
1700 
1701  phase->evaltrans_cache[i][j] = ExecBuildAggTrans(
1702  aggstate, phase, dosort, dohash, nullcheck);
1703 
1704  /* change back */
1705  aggstate->ss.ps.outerops = outerops;
1706  aggstate->ss.ps.outeropsfixed = outerfixed;
1707  }
1708 
1709  phase->evaltrans = phase->evaltrans_cache[i][j];
1710 }
1711 
1712 /*
1713  * Set limits that trigger spilling to avoid exceeding work_mem. Consider the
1714  * number of partitions we expect to create (if we do spill).
1715  *
1716  * There are two limits: a memory limit, and also an ngroups limit. The
1717  * ngroups limit becomes important when we expect transition values to grow
1718  * substantially larger than the initial value.
1719  */
1720 void
1721 hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
1722  Size *mem_limit, uint64 *ngroups_limit,
1723  int *num_partitions)
1724 {
1725  int npartitions;
1726  Size partition_mem;
1727 
1728  /* if not expected to spill, use all of work_mem */
1729  if (input_groups * hashentrysize < work_mem * 1024L)
1730  {
1731  if (num_partitions != NULL)
1732  *num_partitions = 0;
1733  *mem_limit = work_mem * 1024L;
1734  *ngroups_limit = *mem_limit / hashentrysize;
1735  return;
1736  }
1737 
1738  /*
1739  * Calculate expected memory requirements for spilling, which is the size
1740  * of the buffers needed for all the tapes that need to be open at
1741  * once. Then, subtract that from the memory available for holding hash
1742  * tables.
1743  */
1744  npartitions = hash_choose_num_partitions(input_groups,
1745  hashentrysize,
1746  used_bits,
1747  NULL);
1748  if (num_partitions != NULL)
1749  *num_partitions = npartitions;
1750 
1751  partition_mem =
1753  HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1754 
1755  /*
1756  * Don't set the limit below 3/4 of work_mem. In that case, we are at the
1757  * minimum number of partitions, so we aren't going to dramatically exceed
1758  * work mem anyway.
1759  */
1760  if (work_mem * 1024L > 4 * partition_mem)
1761  *mem_limit = work_mem * 1024L - partition_mem;
1762  else
1763  *mem_limit = work_mem * 1024L * 0.75;
1764 
1765  if (*mem_limit > hashentrysize)
1766  *ngroups_limit = *mem_limit / hashentrysize;
1767  else
1768  *ngroups_limit = 1;
1769 }
1770 
1771 /*
1772  * hash_agg_check_limits
1773  *
1774  * After adding a new group to the hash table, check whether we need to enter
1775  * spill mode. Allocations may happen without adding new groups (for instance,
1776  * if the transition state size grows), so this check is imperfect.
1777  */
1778 static void
1780 {
1781  uint64 ngroups = aggstate->hash_ngroups_current;
1782  Size meta_mem = MemoryContextMemAllocated(
1783  aggstate->hash_metacxt, true);
1784  Size hash_mem = MemoryContextMemAllocated(
1785  aggstate->hashcontext->ecxt_per_tuple_memory, true);
1786 
1787  /*
1788  * Don't spill unless there's at least one group in the hash table so we
1789  * can be sure to make progress even in edge cases.
1790  */
1791  if (aggstate->hash_ngroups_current > 0 &&
1792  (meta_mem + hash_mem > aggstate->hash_mem_limit ||
1793  ngroups > aggstate->hash_ngroups_limit))
1794  {
1795  hash_agg_enter_spill_mode(aggstate);
1796  }
1797 }
1798 
1799 /*
1800  * Enter "spill mode", meaning that no new groups are added to any of the hash
1801  * tables. Tuples that would create a new group are instead spilled, and
1802  * processed later.
1803  */
1804 static void
1806 {
1807  aggstate->hash_spill_mode = true;
1808  hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1809 
1810  if (!aggstate->hash_ever_spilled)
1811  {
1812  Assert(aggstate->hash_tapeinfo == NULL);
1813  Assert(aggstate->hash_spills == NULL);
1814 
1815  aggstate->hash_ever_spilled = true;
1816 
1817  hashagg_tapeinfo_init(aggstate);
1818 
1819  aggstate->hash_spills = palloc(
1820  sizeof(HashAggSpill) * aggstate->num_hashes);
1821 
1822  for (int setno = 0; setno < aggstate->num_hashes; setno++)
1823  {
1824  AggStatePerHash perhash = &aggstate->perhash[setno];
1825  HashAggSpill *spill = &aggstate->hash_spills[setno];
1826 
1827  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1828  perhash->aggnode->numGroups,
1829  aggstate->hashentrysize);
1830  }
1831  }
1832 }
1833 
1834 /*
1835  * Update metrics after filling the hash table.
1836  *
1837  * If reading from the outer plan, from_tape should be false; if reading from
1838  * another tape, from_tape should be true.
1839  */
1840 static void
1841 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1842 {
1843  Size meta_mem;
1844  Size hash_mem;
1845  Size buffer_mem;
1846  Size total_mem;
1847 
1848  if (aggstate->aggstrategy != AGG_MIXED &&
1849  aggstate->aggstrategy != AGG_HASHED)
1850  return;
1851 
1852  /* memory for the hash table itself */
1853  meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1854 
1855  /* memory for the group keys and transition states */
1856  hash_mem = MemoryContextMemAllocated(
1857  aggstate->hashcontext->ecxt_per_tuple_memory, true);
1858 
1859  /* memory for read/write tape buffers, if spilled */
1860  buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1861  if (from_tape)
1862  buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1863 
1864  /* update peak mem */
1865  total_mem = meta_mem + hash_mem + buffer_mem;
1866  if (total_mem > aggstate->hash_mem_peak)
1867  aggstate->hash_mem_peak = total_mem;
1868 
1869  /* update disk usage */
1870  if (aggstate->hash_tapeinfo != NULL)
1871  {
1872  uint64 disk_used = LogicalTapeSetBlocks(
1873  aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1874 
1875  if (aggstate->hash_disk_used < disk_used)
1876  aggstate->hash_disk_used = disk_used;
1877  }
1878 
1879  /* update hashentrysize estimate based on contents */
1880  if (aggstate->hash_ngroups_current > 0)
1881  {
1882  aggstate->hashentrysize =
1883  sizeof(TupleHashEntryData) +
1884  (hash_mem / (double)aggstate->hash_ngroups_current);
1885  }
1886 }
1887 
1888 /*
1889  * Choose a reasonable number of buckets for the initial hash table size.
1890  */
1891 static long
1892 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1893 {
1894  long max_nbuckets;
1895  long nbuckets = ngroups;
1896 
1897  max_nbuckets = memory / hashentrysize;
1898 
1899  /*
1900  * Underestimating is better than overestimating. Too many buckets crowd
1901  * out space for group keys and transition state values.
1902  */
1903  max_nbuckets >>= 1;
1904 
1905  if (nbuckets > max_nbuckets)
1906  nbuckets = max_nbuckets;
1907  if (nbuckets < HASHAGG_MIN_BUCKETS)
1908  nbuckets = HASHAGG_MIN_BUCKETS;
1909  return nbuckets;
1910 }
1911 
1912 /*
1913  * Determine the number of partitions to create when spilling, which will
1914  * always be a power of two. If log2_npartitions is non-NULL, set
1915  * *log2_npartitions to the log2() of the number of partitions.
1916  */
1917 static int
1918 hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
1919  int used_bits, int *log2_npartitions)
1920 {
1921  Size mem_wanted;
1922  int partition_limit;
1923  int npartitions;
1924  int partition_bits;
1925 
1926  /*
1927  * Avoid creating so many partitions that the memory requirements of the
1928  * open partition files are greater than 1/4 of work_mem.
1929  */
1930  partition_limit =
1931  (work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
1933 
1934  mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
1935 
1936  /* make enough partitions so that each one is likely to fit in memory */
1937  npartitions = 1 + (mem_wanted / (work_mem * 1024L));
1938 
1939  if (npartitions > partition_limit)
1940  npartitions = partition_limit;
1941 
1942  if (npartitions < HASHAGG_MIN_PARTITIONS)
1943  npartitions = HASHAGG_MIN_PARTITIONS;
1944  if (npartitions > HASHAGG_MAX_PARTITIONS)
1945  npartitions = HASHAGG_MAX_PARTITIONS;
1946 
1947  /* ceil(log2(npartitions)) */
1948  partition_bits = my_log2(npartitions);
1949 
1950  /* make sure that we don't exhaust the hash bits */
1951  if (partition_bits + used_bits >= 32)
1952  partition_bits = 32 - used_bits;
1953 
1954  if (log2_npartitions != NULL)
1955  *log2_npartitions = partition_bits;
1956 
1957  /* number of partitions will be a power of two */
1958  npartitions = 1L << partition_bits;
1959 
1960  return npartitions;
1961 }
1962 
1963 /*
1964  * Find or create a hashtable entry for the tuple group containing the current
1965  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1966  * set (which the caller must have selected - note that initialize_aggregate
1967  * depends on this).
1968  *
1969  * When called, CurrentMemoryContext should be the per-query context. The
1970  * already-calculated hash value for the tuple must be specified.
1971  *
1972  * If in "spill mode", then only find existing hashtable entries; don't create
1973  * new ones. If a tuple's group is not already present in the hash table for
1974  * the current grouping set, assign *in_hash_table=false and the caller will
1975  * spill it to disk.
1976  */
1977 static AggStatePerGroup
1978 lookup_hash_entry(AggState *aggstate, uint32 hash, bool *in_hash_table)
1979 {
1980  AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1981  TupleTableSlot *hashslot = perhash->hashslot;
1982  TupleHashEntryData *entry;
1983  bool isnew = false;
1984  bool *p_isnew;
1985 
1986  /* if hash table already spilled, don't create new entries */
1987  p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
1988 
1989  /* find or create the hashtable entry using the filtered tuple */
1990  entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew,
1991  hash);
1992 
1993  if (entry == NULL)
1994  {
1995  *in_hash_table = false;
1996  return NULL;
1997  }
1998  else
1999  *in_hash_table = true;
2000 
2001  if (isnew)
2002  {
2003  AggStatePerGroup pergroup;
2004  int transno;
2005 
2006  aggstate->hash_ngroups_current++;
2007  hash_agg_check_limits(aggstate);
2008 
2009  /* no need to allocate or initialize per-group state */
2010  if (aggstate->numtrans == 0)
2011  return NULL;
2012 
2013  pergroup = (AggStatePerGroup)
2015  sizeof(AggStatePerGroupData) * aggstate->numtrans);
2016 
2017  entry->additional = pergroup;
2018 
2019  /*
2020  * Initialize aggregates for new tuple group, lookup_hash_entries()
2021  * already has selected the relevant grouping set.
2022  */
2023  for (transno = 0; transno < aggstate->numtrans; transno++)
2024  {
2025  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2026  AggStatePerGroup pergroupstate = &pergroup[transno];
2027 
2028  initialize_aggregate(aggstate, pertrans, pergroupstate);
2029  }
2030  }
2031 
2032  return entry->additional;
2033 }
2034 
2035 /*
2036  * Look up hash entries for the current tuple in all hashed grouping sets,
2037  * returning an array of pergroup pointers suitable for advance_aggregates.
2038  *
2039  * Be aware that lookup_hash_entry can reset the tmpcontext.
2040  *
2041  * Some entries may be left NULL if we are in "spill mode". The same tuple
2042  * will belong to different groups for each grouping set, so may match a group
2043  * already in memory for one set and match a group not in memory for another
2044  * set. When in "spill mode", the tuple will be spilled for each grouping set
2045  * where it doesn't match a group in memory.
2046  *
2047  * NB: It's possible to spill the same tuple for several different grouping
2048  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2049  * the tuple multiple times for multiple grouping sets, it can be partitioned
2050  * for each grouping set, making the refilling of the hash table very
2051  * efficient.
2052  */
2053 static void
2055 {
2056  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2057  int setno;
2058 
2059  for (setno = 0; setno < aggstate->num_hashes; setno++)
2060  {
2061  AggStatePerHash perhash = &aggstate->perhash[setno];
2062  uint32 hash;
2063  bool in_hash_table;
2064 
2065  select_current_set(aggstate, setno, true);
2066  prepare_hash_slot(aggstate);
2067  hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot);
2068  pergroup[setno] = lookup_hash_entry(aggstate, hash, &in_hash_table);
2069 
2070  /* check to see if we need to spill the tuple for this grouping set */
2071  if (!in_hash_table)
2072  {
2073  HashAggSpill *spill = &aggstate->hash_spills[setno];
2074  TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2075 
2076  if (spill->partitions == NULL)
2077  hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2078  perhash->aggnode->numGroups,
2079  aggstate->hashentrysize);
2080 
2081  hashagg_spill_tuple(spill, slot, hash);
2082  }
2083  }
2084 }
2085 
2086 /*
2087  * ExecAgg -
2088  *
2089  * ExecAgg receives tuples from its outer subplan and aggregates over
2090  * the appropriate attribute for each aggregate function use (Aggref
2091  * node) appearing in the targetlist or qual of the node. The number
2092  * of tuples to aggregate over depends on whether grouped or plain
2093  * aggregation is selected. In grouped aggregation, we produce a result
2094  * row for each group; in plain aggregation there's a single result row
2095  * for the whole query. In either case, the value of each aggregate is
2096  * stored in the expression context to be used when ExecProject evaluates
2097  * the result tuple.
2098  */
2099 static TupleTableSlot *
2101 {
2102  AggState *node = castNode(AggState, pstate);
2103  TupleTableSlot *result = NULL;
2104 
2106 
2107  if (!node->agg_done)
2108  {
2109  /* Dispatch based on strategy */
2110  switch (node->phase->aggstrategy)
2111  {
2112  case AGG_HASHED:
2113  if (!node->table_filled)
2114  agg_fill_hash_table(node);
2115  /* FALLTHROUGH */
2116  case AGG_MIXED:
2117  result = agg_retrieve_hash_table(node);
2118  break;
2119  case AGG_PLAIN:
2120  case AGG_SORTED:
2121  result = agg_retrieve_direct(node);
2122  break;
2123  }
2124 
2125  if (!TupIsNull(result))
2126  return result;
2127  }
2128 
2129  return NULL;
2130 }
2131 
2132 /*
2133  * ExecAgg for non-hashed case
2134  */
2135 static TupleTableSlot *
2137 {
2138  Agg *node = aggstate->phase->aggnode;
2139  ExprContext *econtext;
2140  ExprContext *tmpcontext;
2141  AggStatePerAgg peragg;
2142  AggStatePerGroup *pergroups;
2143  TupleTableSlot *outerslot;
2144  TupleTableSlot *firstSlot;
2145  TupleTableSlot *result;
2146  bool hasGroupingSets = aggstate->phase->numsets > 0;
2147  int numGroupingSets = Max(aggstate->phase->numsets, 1);
2148  int currentSet;
2149  int nextSetSize;
2150  int numReset;
2151  int i;
2152 
2153  /*
2154  * get state info from node
2155  *
2156  * econtext is the per-output-tuple expression context
2157  *
2158  * tmpcontext is the per-input-tuple expression context
2159  */
2160  econtext = aggstate->ss.ps.ps_ExprContext;
2161  tmpcontext = aggstate->tmpcontext;
2162 
2163  peragg = aggstate->peragg;
2164  pergroups = aggstate->pergroups;
2165  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2166 
2167  /*
2168  * We loop retrieving groups until we find one matching
2169  * aggstate->ss.ps.qual
2170  *
2171  * For grouping sets, we have the invariant that aggstate->projected_set
2172  * is either -1 (initial call) or the index (starting from 0) in
2173  * gset_lengths for the group we just completed (either by projecting a
2174  * row or by discarding it in the qual).
2175  */
2176  while (!aggstate->agg_done)
2177  {
2178  /*
2179  * Clear the per-output-tuple context for each group, as well as
2180  * aggcontext (which contains any pass-by-ref transvalues of the old
2181  * group). Some aggregate functions store working state in child
2182  * contexts; those now get reset automatically without us needing to
2183  * do anything special.
2184  *
2185  * We use ReScanExprContext not just ResetExprContext because we want
2186  * any registered shutdown callbacks to be called. That allows
2187  * aggregate functions to ensure they've cleaned up any non-memory
2188  * resources.
2189  */
2190  ReScanExprContext(econtext);
2191 
2192  /*
2193  * Determine how many grouping sets need to be reset at this boundary.
2194  */
2195  if (aggstate->projected_set >= 0 &&
2196  aggstate->projected_set < numGroupingSets)
2197  numReset = aggstate->projected_set + 1;
2198  else
2199  numReset = numGroupingSets;
2200 
2201  /*
2202  * numReset can change on a phase boundary, but that's OK; we want to
2203  * reset the contexts used in _this_ phase, and later, after possibly
2204  * changing phase, initialize the right number of aggregates for the
2205  * _new_ phase.
2206  */
2207 
2208  for (i = 0; i < numReset; i++)
2209  {
2210  ReScanExprContext(aggstate->aggcontexts[i]);
2211  }
2212 
2213  /*
2214  * Check if input is complete and there are no more groups to project
2215  * in this phase; move to next phase or mark as done.
2216  */
2217  if (aggstate->input_done == true &&
2218  aggstate->projected_set >= (numGroupingSets - 1))
2219  {
2220  if (aggstate->current_phase < aggstate->numphases - 1)
2221  {
2222  initialize_phase(aggstate, aggstate->current_phase + 1);
2223  aggstate->input_done = false;
2224  aggstate->projected_set = -1;
2225  numGroupingSets = Max(aggstate->phase->numsets, 1);
2226  node = aggstate->phase->aggnode;
2227  numReset = numGroupingSets;
2228  }
2229  else if (aggstate->aggstrategy == AGG_MIXED)
2230  {
2231  /*
2232  * Mixed mode; we've output all the grouped stuff and have
2233  * full hashtables, so switch to outputting those.
2234  */
2235  initialize_phase(aggstate, 0);
2236  aggstate->table_filled = true;
2238  &aggstate->perhash[0].hashiter);
2239  select_current_set(aggstate, 0, true);
2240  return agg_retrieve_hash_table(aggstate);
2241  }
2242  else
2243  {
2244  aggstate->agg_done = true;
2245  break;
2246  }
2247  }
2248 
2249  /*
2250  * Get the number of columns in the next grouping set after the last
2251  * projected one (if any). This is the number of columns to compare to
2252  * see if we reached the boundary of that set too.
2253  */
2254  if (aggstate->projected_set >= 0 &&
2255  aggstate->projected_set < (numGroupingSets - 1))
2256  nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2257  else
2258  nextSetSize = 0;
2259 
2260  /*----------
2261  * If a subgroup for the current grouping set is present, project it.
2262  *
2263  * We have a new group if:
2264  * - we're out of input but haven't projected all grouping sets
2265  * (checked above)
2266  * OR
2267  * - we already projected a row that wasn't from the last grouping
2268  * set
2269  * AND
2270  * - the next grouping set has at least one grouping column (since
2271  * empty grouping sets project only once input is exhausted)
2272  * AND
2273  * - the previous and pending rows differ on the grouping columns
2274  * of the next grouping set
2275  *----------
2276  */
2277  tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2278  if (aggstate->input_done ||
2279  (node->aggstrategy != AGG_PLAIN &&
2280  aggstate->projected_set != -1 &&
2281  aggstate->projected_set < (numGroupingSets - 1) &&
2282  nextSetSize > 0 &&
2283  !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2284  tmpcontext)))
2285  {
2286  aggstate->projected_set += 1;
2287 
2288  Assert(aggstate->projected_set < numGroupingSets);
2289  Assert(nextSetSize > 0 || aggstate->input_done);
2290  }
2291  else
2292  {
2293  /*
2294  * We no longer care what group we just projected, the next
2295  * projection will always be the first (or only) grouping set
2296  * (unless the input proves to be empty).
2297  */
2298  aggstate->projected_set = 0;
2299 
2300  /*
2301  * If we don't already have the first tuple of the new group,
2302  * fetch it from the outer plan.
2303  */
2304  if (aggstate->grp_firstTuple == NULL)
2305  {
2306  outerslot = fetch_input_tuple(aggstate);
2307  if (!TupIsNull(outerslot))
2308  {
2309  /*
2310  * Make a copy of the first input tuple; we will use this
2311  * for comparisons (in group mode) and for projection.
2312  */
2313  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2314  }
2315  else
2316  {
2317  /* outer plan produced no tuples at all */
2318  if (hasGroupingSets)
2319  {
2320  /*
2321  * If there was no input at all, we need to project
2322  * rows only if there are grouping sets of size 0.
2323  * Note that this implies that there can't be any
2324  * references to ungrouped Vars, which would otherwise
2325  * cause issues with the empty output slot.
2326  *
2327  * XXX: This is no longer true, we currently deal with
2328  * this in finalize_aggregates().
2329  */
2330  aggstate->input_done = true;
2331 
2332  while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2333  {
2334  aggstate->projected_set += 1;
2335  if (aggstate->projected_set >= numGroupingSets)
2336  {
2337  /*
2338  * We can't set agg_done here because we might
2339  * have more phases to do, even though the
2340  * input is empty. So we need to restart the
2341  * whole outer loop.
2342  */
2343  break;
2344  }
2345  }
2346 
2347  if (aggstate->projected_set >= numGroupingSets)
2348  continue;
2349  }
2350  else
2351  {
2352  aggstate->agg_done = true;
2353  /* If we are grouping, we should produce no tuples too */
2354  if (node->aggstrategy != AGG_PLAIN)
2355  return NULL;
2356  }
2357  }
2358  }
2359 
2360  /*
2361  * Initialize working state for a new input tuple group.
2362  */
2363  initialize_aggregates(aggstate, pergroups, numReset);
2364 
2365  if (aggstate->grp_firstTuple != NULL)
2366  {
2367  /*
2368  * Store the copied first input tuple in the tuple table slot
2369  * reserved for it. The tuple will be deleted when it is
2370  * cleared from the slot.
2371  */
2373  firstSlot, true);
2374  aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2375 
2376  /* set up for first advance_aggregates call */
2377  tmpcontext->ecxt_outertuple = firstSlot;
2378 
2379  /*
2380  * Process each outer-plan tuple, and then fetch the next one,
2381  * until we exhaust the outer plan or cross a group boundary.
2382  */
2383  for (;;)
2384  {
2385  /*
2386  * During phase 1 only of a mixed agg, we need to update
2387  * hashtables as well in advance_aggregates.
2388  */
2389  if (aggstate->aggstrategy == AGG_MIXED &&
2390  aggstate->current_phase == 1)
2391  {
2392  lookup_hash_entries(aggstate);
2393  }
2394 
2395  /* Advance the aggregates (or combine functions) */
2396  advance_aggregates(aggstate);
2397 
2398  /* Reset per-input-tuple context after each tuple */
2399  ResetExprContext(tmpcontext);
2400 
2401  outerslot = fetch_input_tuple(aggstate);
2402  if (TupIsNull(outerslot))
2403  {
2404  /* no more outer-plan tuples available */
2405 
2406  /* if we built hash tables, finalize any spills */
2407  if (aggstate->aggstrategy == AGG_MIXED &&
2408  aggstate->current_phase == 1)
2410 
2411  if (hasGroupingSets)
2412  {
2413  aggstate->input_done = true;
2414  break;
2415  }
2416  else
2417  {
2418  aggstate->agg_done = true;
2419  break;
2420  }
2421  }
2422  /* set up for next advance_aggregates call */
2423  tmpcontext->ecxt_outertuple = outerslot;
2424 
2425  /*
2426  * If we are grouping, check whether we've crossed a group
2427  * boundary.
2428  */
2429  if (node->aggstrategy != AGG_PLAIN)
2430  {
2431  tmpcontext->ecxt_innertuple = firstSlot;
2432  if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2433  tmpcontext))
2434  {
2435  aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2436  break;
2437  }
2438  }
2439  }
2440  }
2441 
2442  /*
2443  * Use the representative input tuple for any references to
2444  * non-aggregated input columns in aggregate direct args, the node
2445  * qual, and the tlist. (If we are not grouping, and there are no
2446  * input rows at all, we will come here with an empty firstSlot
2447  * ... but if not grouping, there can't be any references to
2448  * non-aggregated input columns, so no problem.)
2449  */
2450  econtext->ecxt_outertuple = firstSlot;
2451  }
2452 
2453  Assert(aggstate->projected_set >= 0);
2454 
2455  currentSet = aggstate->projected_set;
2456 
2457  prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2458 
2459  select_current_set(aggstate, currentSet, false);
2460 
2461  finalize_aggregates(aggstate,
2462  peragg,
2463  pergroups[currentSet]);
2464 
2465  /*
2466  * If there's no row to project right now, we must continue rather
2467  * than returning a null since there might be more groups.
2468  */
2469  result = project_aggregates(aggstate);
2470  if (result)
2471  return result;
2472  }
2473 
2474  /* No more groups */
2475  return NULL;
2476 }
2477 
2478 /*
2479  * ExecAgg for hashed case: read input and build hash table
2480  */
2481 static void
2483 {
2484  TupleTableSlot *outerslot;
2485  ExprContext *tmpcontext = aggstate->tmpcontext;
2486 
2487  /*
2488  * Process each outer-plan tuple, and then fetch the next one, until we
2489  * exhaust the outer plan.
2490  */
2491  for (;;)
2492  {
2493  outerslot = fetch_input_tuple(aggstate);
2494  if (TupIsNull(outerslot))
2495  break;
2496 
2497  /* set up for lookup_hash_entries and advance_aggregates */
2498  tmpcontext->ecxt_outertuple = outerslot;
2499 
2500  /* Find or build hashtable entries */
2501  lookup_hash_entries(aggstate);
2502 
2503  /* Advance the aggregates (or combine functions) */
2504  advance_aggregates(aggstate);
2505 
2506  /*
2507  * Reset per-input-tuple context after each tuple, but note that the
2508  * hash lookups do this too
2509  */
2510  ResetExprContext(aggstate->tmpcontext);
2511  }
2512 
2513  /* finalize spills, if any */
2515 
2516  aggstate->table_filled = true;
2517  /* Initialize to walk the first hash table */
2518  select_current_set(aggstate, 0, true);
2520  &aggstate->perhash[0].hashiter);
2521 }
2522 
2523 /*
2524  * If any data was spilled during hash aggregation, reset the hash table and
2525  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2526  * table will again contain data, ready to be consumed by
2527  * agg_retrieve_hash_table_in_memory().
2528  *
2529  * Should only be called after all in memory hash table entries have been
2530  * finalized and emitted.
2531  *
2532  * Return false when input is exhausted and there's no more work to be done;
2533  * otherwise return true.
2534  */
2535 static bool
2537 {
2538  HashAggBatch *batch;
2539  HashAggSpill spill;
2540  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2541  uint64 ngroups_estimate;
2542  bool spill_initialized = false;
2543 
2544  if (aggstate->hash_batches == NIL)
2545  return false;
2546 
2547  batch = linitial(aggstate->hash_batches);
2548  aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
2549 
2550  /*
2551  * Estimate the number of groups for this batch as the total number of
2552  * tuples in its input file. Although that's a worst case, it's not bad
2553  * here for two reasons: (1) overestimating is better than
2554  * underestimating; and (2) we've already scanned the relation once, so
2555  * it's likely that we've already finalized many of the common values.
2556  */
2557  ngroups_estimate = batch->input_tuples;
2558 
2559  hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
2560  batch->used_bits, &aggstate->hash_mem_limit,
2561  &aggstate->hash_ngroups_limit, NULL);
2562 
2563  /* there could be residual pergroup pointers; clear them */
2564  for (int setoff = 0;
2565  setoff < aggstate->maxsets + aggstate->num_hashes;
2566  setoff++)
2567  aggstate->all_pergroups[setoff] = NULL;
2568 
2569  /* free memory and reset hash tables */
2570  ReScanExprContext(aggstate->hashcontext);
2571  for (int setno = 0; setno < aggstate->num_hashes; setno++)
2572  ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2573 
2574  aggstate->hash_ngroups_current = 0;
2575 
2576  /*
2577  * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2578  * happens in phase 0. So, we switch to phase 1 when processing a batch,
2579  * and back to phase 0 after the batch is done.
2580  */
2581  Assert(aggstate->current_phase == 0);
2582  if (aggstate->phase->aggstrategy == AGG_MIXED)
2583  {
2584  aggstate->current_phase = 1;
2585  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2586  }
2587 
2588  select_current_set(aggstate, batch->setno, true);
2589 
2590  /*
2591  * Spilled tuples are always read back as MinimalTuples, which may be
2592  * different from the outer plan, so recompile the aggregate expressions.
2593  *
2594  * We still need the NULL check, because we are only processing one
2595  * grouping set at a time and the rest will be NULL.
2596  */
2597  hashagg_recompile_expressions(aggstate, true, true);
2598 
2601  for (;;) {
2602  TupleTableSlot *slot = aggstate->hash_spill_slot;
2603  MinimalTuple tuple;
2604  uint32 hash;
2605  bool in_hash_table;
2606 
2608 
2609  tuple = hashagg_batch_read(batch, &hash);
2610  if (tuple == NULL)
2611  break;
2612 
2613  ExecStoreMinimalTuple(tuple, slot, true);
2614  aggstate->tmpcontext->ecxt_outertuple = slot;
2615 
2616  prepare_hash_slot(aggstate);
2617  aggstate->hash_pergroup[batch->setno] = lookup_hash_entry(
2618  aggstate, hash, &in_hash_table);
2619 
2620  if (in_hash_table)
2621  {
2622  /* Advance the aggregates (or combine functions) */
2623  advance_aggregates(aggstate);
2624  }
2625  else
2626  {
2627  if (!spill_initialized)
2628  {
2629  /*
2630  * Avoid initializing the spill until we actually need it so
2631  * that we don't assign tapes that will never be used.
2632  */
2633  spill_initialized = true;
2634  hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2635  ngroups_estimate, aggstate->hashentrysize);
2636  }
2637  /* no memory for a new group, spill */
2638  hashagg_spill_tuple(&spill, slot, hash);
2639  }
2640 
2641  /*
2642  * Reset per-input-tuple context after each tuple, but note that the
2643  * hash lookups do this too
2644  */
2645  ResetExprContext(aggstate->tmpcontext);
2646  }
2647 
2648  hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2649 
2650  /* change back to phase 0 */
2651  aggstate->current_phase = 0;
2652  aggstate->phase = &aggstate->phases[aggstate->current_phase];
2653 
2654  if (spill_initialized)
2655  {
2656  hash_agg_update_metrics(aggstate, true, spill.npartitions);
2657  hashagg_spill_finish(aggstate, &spill, batch->setno);
2658  }
2659  else
2660  hash_agg_update_metrics(aggstate, true, 0);
2661 
2662  aggstate->hash_spill_mode = false;
2663 
2664  /* prepare to walk the first hash table */
2665  select_current_set(aggstate, batch->setno, true);
2666  ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2667  &aggstate->perhash[batch->setno].hashiter);
2668 
2669  pfree(batch);
2670 
2671  return true;
2672 }
2673 
2674 /*
2675  * ExecAgg for hashed case: retrieving groups from hash table
2676  *
2677  * After exhausting in-memory tuples, also try refilling the hash table using
2678  * previously-spilled tuples. Only returns NULL after all in-memory and
2679  * spilled tuples are exhausted.
2680  */
2681 static TupleTableSlot *
2683 {
2684  TupleTableSlot *result = NULL;
2685 
2686  while (result == NULL)
2687  {
2688  result = agg_retrieve_hash_table_in_memory(aggstate);
2689  if (result == NULL)
2690  {
2691  if (!agg_refill_hash_table(aggstate))
2692  {
2693  aggstate->agg_done = true;
2694  break;
2695  }
2696  }
2697  }
2698 
2699  return result;
2700 }
2701 
2702 /*
2703  * Retrieve the groups from the in-memory hash tables without considering any
2704  * spilled tuples.
2705  */
2706 static TupleTableSlot *
2708 {
2709  ExprContext *econtext;
2710  AggStatePerAgg peragg;
2711  AggStatePerGroup pergroup;
2712  TupleHashEntryData *entry;
2713  TupleTableSlot *firstSlot;
2714  TupleTableSlot *result;
2715  AggStatePerHash perhash;
2716 
2717  /*
2718  * get state info from node.
2719  *
2720  * econtext is the per-output-tuple expression context.
2721  */
2722  econtext = aggstate->ss.ps.ps_ExprContext;
2723  peragg = aggstate->peragg;
2724  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2725 
2726  /*
2727  * Note that perhash (and therefore anything accessed through it) can
2728  * change inside the loop, as we change between grouping sets.
2729  */
2730  perhash = &aggstate->perhash[aggstate->current_set];
2731 
2732  /*
2733  * We loop retrieving groups until we find one satisfying
2734  * aggstate->ss.ps.qual
2735  */
2736  for (;;)
2737  {
2738  TupleTableSlot *hashslot = perhash->hashslot;
2739  int i;
2740 
2742 
2743  /*
2744  * Find the next entry in the hash table
2745  */
2746  entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2747  if (entry == NULL)
2748  {
2749  int nextset = aggstate->current_set + 1;
2750 
2751  if (nextset < aggstate->num_hashes)
2752  {
2753  /*
2754  * Switch to next grouping set, reinitialize, and restart the
2755  * loop.
2756  */
2757  select_current_set(aggstate, nextset, true);
2758 
2759  perhash = &aggstate->perhash[aggstate->current_set];
2760 
2761  ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2762 
2763  continue;
2764  }
2765  else
2766  {
2767  return NULL;
2768  }
2769  }
2770 
2771  /*
2772  * Clear the per-output-tuple context for each group
2773  *
2774  * We intentionally don't use ReScanExprContext here; if any aggs have
2775  * registered shutdown callbacks, they mustn't be called yet, since we
2776  * might not be done with that agg.
2777  */
2778  ResetExprContext(econtext);
2779 
2780  /*
2781  * Transform representative tuple back into one with the right
2782  * columns.
2783  */
2784  ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2785  slot_getallattrs(hashslot);
2786 
2787  ExecClearTuple(firstSlot);
2788  memset(firstSlot->tts_isnull, true,
2789  firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2790 
2791  for (i = 0; i < perhash->numhashGrpCols; i++)
2792  {
2793  int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2794 
2795  firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2796  firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2797  }
2798  ExecStoreVirtualTuple(firstSlot);
2799 
2800  pergroup = (AggStatePerGroup) entry->additional;
2801 
2802  /*
2803  * Use the representative input tuple for any references to
2804  * non-aggregated input columns in the qual and tlist.
2805  */
2806  econtext->ecxt_outertuple = firstSlot;
2807 
2808  prepare_projection_slot(aggstate,
2809  econtext->ecxt_outertuple,
2810  aggstate->current_set);
2811 
2812  finalize_aggregates(aggstate, peragg, pergroup);
2813 
2814  result = project_aggregates(aggstate);
2815  if (result)
2816  return result;
2817  }
2818 
2819  /* No more groups */
2820  return NULL;
2821 }
2822 
2823 /*
2824  * Initialize HashTapeInfo
2825  */
2826 static void
2828 {
2829  HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2830  int init_tapes = 16; /* expanded dynamically */
2831 
2832  tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, NULL, NULL, -1);
2833  tapeinfo->ntapes = init_tapes;
2834  tapeinfo->nfreetapes = init_tapes;
2835  tapeinfo->freetapes_alloc = init_tapes;
2836  tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2837  for (int i = 0; i < init_tapes; i++)
2838  tapeinfo->freetapes[i] = i;
2839 
2840  aggstate->hash_tapeinfo = tapeinfo;
2841 }
2842 
2843 /*
2844  * Assign unused tapes to spill partitions, extending the tape set if
2845  * necessary.
2846  */
2847 static void
2849  int npartitions)
2850 {
2851  int partidx = 0;
2852 
2853  /* use free tapes if available */
2854  while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2855  partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2856 
2857  if (partidx < npartitions)
2858  {
2859  LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2860 
2861  while (partidx < npartitions)
2862  partitions[partidx++] = tapeinfo->ntapes++;
2863  }
2864 }
2865 
2866 /*
2867  * After a tape has already been written to and then read, this function
2868  * rewinds it for writing and adds it to the free list.
2869  */
2870 static void
2872 {
2873  LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2874  if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2875  {
2876  tapeinfo->freetapes_alloc <<= 1;
2877  tapeinfo->freetapes = repalloc(
2878  tapeinfo->freetapes, tapeinfo->freetapes_alloc * sizeof(int));
2879  }
2880  tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2881 }
2882 
2883 /*
2884  * hashagg_spill_init
2885  *
2886  * Called after we determined that spilling is necessary. Chooses the number
2887  * of partitions to create, and initializes them.
2888  */
2889 static void
2890 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2891  uint64 input_groups, double hashentrysize)
2892 {
2893  int npartitions;
2894  int partition_bits;
2895 
2896  npartitions = hash_choose_num_partitions(
2897  input_groups, hashentrysize, used_bits, &partition_bits);
2898 
2899  spill->partitions = palloc0(sizeof(int) * npartitions);
2900  spill->ntuples = palloc0(sizeof(int64) * npartitions);
2901 
2902  hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2903 
2904  spill->tapeset = tapeinfo->tapeset;
2905  spill->shift = 32 - used_bits - partition_bits;
2906  spill->mask = (npartitions - 1) << spill->shift;
2907  spill->npartitions = npartitions;
2908 }
2909 
2910 /*
2911  * hashagg_spill_tuple
2912  *
2913  * No room for new groups in the hash table. Save for later in the appropriate
2914  * partition.
2915  */
2916 static Size
2918 {
2919  LogicalTapeSet *tapeset = spill->tapeset;
2920  int partition;
2921  MinimalTuple tuple;
2922  int tapenum;
2923  int total_written = 0;
2924  bool shouldFree;
2925 
2926  Assert(spill->partitions != NULL);
2927 
2928  /* XXX: may contain unnecessary attributes, should project */
2929  tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
2930 
2931  partition = (hash & spill->mask) >> spill->shift;
2932  spill->ntuples[partition]++;
2933 
2934  tapenum = spill->partitions[partition];
2935 
2936  LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
2937  total_written += sizeof(uint32);
2938 
2939  LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
2940  total_written += tuple->t_len;
2941 
2942  if (shouldFree)
2943  pfree(tuple);
2944 
2945  return total_written;
2946 }
2947 
2948 /*
2949  * hashagg_batch_new
2950  *
2951  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
2952  * be done.
2953  */
2954 static HashAggBatch *
2955 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
2956  int64 input_tuples, int used_bits)
2957 {
2958  HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
2959 
2960  batch->setno = setno;
2961  batch->used_bits = used_bits;
2962  batch->tapeset = tapeset;
2963  batch->input_tapenum = tapenum;
2964  batch->input_tuples = input_tuples;
2965 
2966  return batch;
2967 }
2968 
2969 /*
2970  * read_spilled_tuple
2971  * read the next tuple from a batch's tape. Return NULL if no more.
2972  */
2973 static MinimalTuple
2975 {
2976  LogicalTapeSet *tapeset = batch->tapeset;
2977  int tapenum = batch->input_tapenum;
2978  MinimalTuple tuple;
2979  uint32 t_len;
2980  size_t nread;
2981  uint32 hash;
2982 
2983  nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
2984  if (nread == 0)
2985  return NULL;
2986  if (nread != sizeof(uint32))
2987  ereport(ERROR,
2989  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
2990  tapenum, sizeof(uint32), nread)));
2991  if (hashp != NULL)
2992  *hashp = hash;
2993 
2994  nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
2995  if (nread != sizeof(uint32))
2996  ereport(ERROR,
2998  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
2999  tapenum, sizeof(uint32), nread)));
3000 
3001  tuple = (MinimalTuple) palloc(t_len);
3002  tuple->t_len = t_len;
3003 
3004  nread = LogicalTapeRead(tapeset, tapenum,
3005  (void *)((char *)tuple + sizeof(uint32)),
3006  t_len - sizeof(uint32));
3007  if (nread != t_len - sizeof(uint32))
3008  ereport(ERROR,
3010  errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3011  tapenum, t_len - sizeof(uint32), nread)));
3012 
3013  return tuple;
3014 }
3015 
3016 /*
3017  * hashagg_finish_initial_spills
3018  *
3019  * After a HashAggBatch has been processed, it may have spilled tuples to
3020  * disk. If so, turn the spilled partitions into new batches that must later
3021  * be executed.
3022  */
3023 static void
3025 {
3026  int setno;
3027  int total_npartitions = 0;
3028 
3029  if (aggstate->hash_spills != NULL)
3030  {
3031  for (setno = 0; setno < aggstate->num_hashes; setno++)
3032  {
3033  HashAggSpill *spill = &aggstate->hash_spills[setno];
3034  total_npartitions += spill->npartitions;
3035  hashagg_spill_finish(aggstate, spill, setno);
3036  }
3037 
3038  /*
3039  * We're not processing tuples from outer plan any more; only
3040  * processing batches of spilled tuples. The initial spill structures
3041  * are no longer needed.
3042  */
3043  pfree(aggstate->hash_spills);
3044  aggstate->hash_spills = NULL;
3045  }
3046 
3047  hash_agg_update_metrics(aggstate, false, total_npartitions);
3048  aggstate->hash_spill_mode = false;
3049 }
3050 
3051 /*
3052  * hashagg_spill_finish
3053  *
3054  * Transform spill partitions into new batches.
3055  */
3056 static void
3057 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3058 {
3059  int i;
3060  int used_bits = 32 - spill->shift;
3061 
3062  if (spill->npartitions == 0)
3063  return; /* didn't spill */
3064 
3065  for (i = 0; i < spill->npartitions; i++)
3066  {
3067  int tapenum = spill->partitions[i];
3068  HashAggBatch *new_batch;
3069 
3070  /* if the partition is empty, don't create a new batch of work */
3071  if (spill->ntuples[i] == 0)
3072  continue;
3073 
3074  new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
3075  tapenum, setno, spill->ntuples[i],
3076  used_bits);
3077  aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
3078  aggstate->hash_batches_used++;
3079  }
3080 
3081  pfree(spill->ntuples);
3082  pfree(spill->partitions);
3083 }
3084 
3085 /*
3086  * Free resources related to a spilled HashAgg.
3087  */
3088 static void
3090 {
3091  ListCell *lc;
3092 
3093  /* free spills from initial pass */
3094  if (aggstate->hash_spills != NULL)
3095  {
3096  int setno;
3097 
3098  for (setno = 0; setno < aggstate->num_hashes; setno++)
3099  {
3100  HashAggSpill *spill = &aggstate->hash_spills[setno];
3101  pfree(spill->ntuples);
3102  pfree(spill->partitions);
3103  }
3104  pfree(aggstate->hash_spills);
3105  aggstate->hash_spills = NULL;
3106  }
3107 
3108  /* free batches */
3109  foreach(lc, aggstate->hash_batches)
3110  {
3111  HashAggBatch *batch = (HashAggBatch*) lfirst(lc);
3112  pfree(batch);
3113  }
3114  list_free(aggstate->hash_batches);
3115  aggstate->hash_batches = NIL;
3116 
3117  /* close tape set */
3118  if (aggstate->hash_tapeinfo != NULL)
3119  {
3120  HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3121 
3122  LogicalTapeSetClose(tapeinfo->tapeset);
3123  pfree(tapeinfo->freetapes);
3124  pfree(tapeinfo);
3125  aggstate->hash_tapeinfo = NULL;
3126  }
3127 }
3128 
3129 
3130 /* -----------------
3131  * ExecInitAgg
3132  *
3133  * Creates the run-time information for the agg node produced by the
3134  * planner and initializes its outer subtree.
3135  *
3136  * -----------------
3137  */
3138 AggState *
3139 ExecInitAgg(Agg *node, EState *estate, int eflags)
3140 {
3141  AggState *aggstate;
3142  AggStatePerAgg peraggs;
3143  AggStatePerTrans pertransstates;
3144  AggStatePerGroup *pergroups;
3145  Plan *outerPlan;
3146  ExprContext *econtext;
3147  TupleDesc scanDesc;
3148  int numaggs,
3149  transno,
3150  aggno;
3151  int phase;
3152  int phaseidx;
3153  ListCell *l;
3154  Bitmapset *all_grouped_cols = NULL;
3155  int numGroupingSets = 1;
3156  int numPhases;
3157  int numHashes;
3158  int i = 0;
3159  int j = 0;
3160  bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3161  node->aggstrategy == AGG_MIXED);
3162 
3163  /* check for unsupported flags */
3164  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3165 
3166  /*
3167  * create state structure
3168  */
3169  aggstate = makeNode(AggState);
3170  aggstate->ss.ps.plan = (Plan *) node;
3171  aggstate->ss.ps.state = estate;
3172  aggstate->ss.ps.ExecProcNode = ExecAgg;
3173 
3174  aggstate->aggs = NIL;
3175  aggstate->numaggs = 0;
3176  aggstate->numtrans = 0;
3177  aggstate->aggstrategy = node->aggstrategy;
3178  aggstate->aggsplit = node->aggsplit;
3179  aggstate->maxsets = 0;
3180  aggstate->projected_set = -1;
3181  aggstate->current_set = 0;
3182  aggstate->peragg = NULL;
3183  aggstate->pertrans = NULL;
3184  aggstate->curperagg = NULL;
3185  aggstate->curpertrans = NULL;
3186  aggstate->input_done = false;
3187  aggstate->agg_done = false;
3188  aggstate->pergroups = NULL;
3189  aggstate->grp_firstTuple = NULL;
3190  aggstate->sort_in = NULL;
3191  aggstate->sort_out = NULL;
3192 
3193  /*
3194  * phases[0] always exists, but is dummy in sorted/plain mode
3195  */
3196  numPhases = (use_hashing ? 1 : 2);
3197  numHashes = (use_hashing ? 1 : 0);
3198 
3199  /*
3200  * Calculate the maximum number of grouping sets in any phase; this
3201  * determines the size of some allocations. Also calculate the number of
3202  * phases, since all hashed/mixed nodes contribute to only a single phase.
3203  */
3204  if (node->groupingSets)
3205  {
3206  numGroupingSets = list_length(node->groupingSets);
3207 
3208  foreach(l, node->chain)
3209  {
3210  Agg *agg = lfirst(l);
3211 
3212  numGroupingSets = Max(numGroupingSets,
3213  list_length(agg->groupingSets));
3214 
3215  /*
3216  * additional AGG_HASHED aggs become part of phase 0, but all
3217  * others add an extra phase.
3218  */
3219  if (agg->aggstrategy != AGG_HASHED)
3220  ++numPhases;
3221  else
3222  ++numHashes;
3223  }
3224  }
3225 
3226  aggstate->maxsets = numGroupingSets;
3227  aggstate->numphases = numPhases;
3228 
3229  aggstate->aggcontexts = (ExprContext **)
3230  palloc0(sizeof(ExprContext *) * numGroupingSets);
3231 
3232  /*
3233  * Create expression contexts. We need three or more, one for
3234  * per-input-tuple processing, one for per-output-tuple processing, one
3235  * for all the hashtables, and one for each grouping set. The per-tuple
3236  * memory context of the per-grouping-set ExprContexts (aggcontexts)
3237  * replaces the standalone memory context formerly used to hold transition
3238  * values. We cheat a little by using ExecAssignExprContext() to build
3239  * all of them.
3240  *
3241  * NOTE: the details of what is stored in aggcontexts and what is stored
3242  * in the regular per-query memory context are driven by a simple
3243  * decision: we want to reset the aggcontext at group boundaries (if not
3244  * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3245  */
3246  ExecAssignExprContext(estate, &aggstate->ss.ps);
3247  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3248 
3249  for (i = 0; i < numGroupingSets; ++i)
3250  {
3251  ExecAssignExprContext(estate, &aggstate->ss.ps);
3252  aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3253  }
3254 
3255  if (use_hashing)
3256  {
3257  ExecAssignExprContext(estate, &aggstate->ss.ps);
3258  aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
3259  }
3260 
3261  ExecAssignExprContext(estate, &aggstate->ss.ps);
3262 
3263  /*
3264  * Initialize child nodes.
3265  *
3266  * If we are doing a hashed aggregation then the child plan does not need
3267  * to handle REWIND efficiently; see ExecReScanAgg.
3268  */
3269  if (node->aggstrategy == AGG_HASHED)
3270  eflags &= ~EXEC_FLAG_REWIND;
3271  outerPlan = outerPlan(node);
3272  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3273 
3274  /*
3275  * initialize source tuple type.
3276  */
3277  aggstate->ss.ps.outerops =
3279  &aggstate->ss.ps.outeropsfixed);
3280  aggstate->ss.ps.outeropsset = true;
3281 
3282  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3283  aggstate->ss.ps.outerops);
3284  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3285 
3286  /*
3287  * If there are more than two phases (including a potential dummy phase
3288  * 0), input will be resorted using tuplesort. Need a slot for that.
3289  */
3290  if (numPhases > 2)
3291  {
3292  aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3294 
3295  /*
3296  * The output of the tuplesort, and the output from the outer child
3297  * might not use the same type of slot. In most cases the child will
3298  * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3299  * input can also be presorted due an index, in which case it could be
3300  * a different type of slot.
3301  *
3302  * XXX: For efficiency it would be good to instead/additionally
3303  * generate expressions with corresponding settings of outerops* for
3304  * the individual phases - deforming is often a bottleneck for
3305  * aggregations with lots of rows per group. If there's multiple
3306  * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3307  * the nodeAgg.c internal tuplesort).
3308  */
3309  if (aggstate->ss.ps.outeropsfixed &&
3310  aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3311  aggstate->ss.ps.outeropsfixed = false;
3312  }
3313 
3314  /*
3315  * Initialize result type, slot and projection.
3316  */
3318  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3319 
3320  /*
3321  * initialize child expressions
3322  *
3323  * We expect the parser to have checked that no aggs contain other agg
3324  * calls in their arguments (and just to be sure, we verify it again while
3325  * initializing the plan node). This would make no sense under SQL
3326  * semantics, and it's forbidden by the spec. Because it is true, we
3327  * don't need to worry about evaluating the aggs in any particular order.
3328  *
3329  * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
3330  * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
3331  * in the targetlist are found during ExecAssignProjectionInfo, below.
3332  */
3333  aggstate->ss.ps.qual =
3334  ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3335 
3336  /*
3337  * We should now have found all Aggrefs in the targetlist and quals.
3338  */
3339  numaggs = aggstate->numaggs;
3340  Assert(numaggs == list_length(aggstate->aggs));
3341 
3342  /*
3343  * For each phase, prepare grouping set data and fmgr lookup data for
3344  * compare functions. Accumulate all_grouped_cols in passing.
3345  */
3346  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3347 
3348  aggstate->num_hashes = numHashes;
3349  if (numHashes)
3350  {
3351  aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3352  aggstate->phases[0].numsets = 0;
3353  aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3354  aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3355  }
3356 
3357  phase = 0;
3358  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3359  {
3360  Agg *aggnode;
3361  Sort *sortnode;
3362 
3363  if (phaseidx > 0)
3364  {
3365  aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3366  sortnode = castNode(Sort, aggnode->plan.lefttree);
3367  }
3368  else
3369  {
3370  aggnode = node;
3371  sortnode = NULL;
3372  }
3373 
3374  Assert(phase <= 1 || sortnode);
3375 
3376  if (aggnode->aggstrategy == AGG_HASHED
3377  || aggnode->aggstrategy == AGG_MIXED)
3378  {
3379  AggStatePerPhase phasedata = &aggstate->phases[0];
3380  AggStatePerHash perhash;
3381  Bitmapset *cols = NULL;
3382 
3383  Assert(phase == 0);
3384  i = phasedata->numsets++;
3385  perhash = &aggstate->perhash[i];
3386 
3387  /* phase 0 always points to the "real" Agg in the hash case */
3388  phasedata->aggnode = node;
3389  phasedata->aggstrategy = node->aggstrategy;
3390 
3391  /* but the actual Agg node representing this hash is saved here */
3392  perhash->aggnode = aggnode;
3393 
3394  phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3395 
3396  for (j = 0; j < aggnode->numCols; ++j)
3397  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3398 
3399  phasedata->grouped_cols[i] = cols;
3400 
3401  all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3402  continue;
3403  }
3404  else
3405  {
3406  AggStatePerPhase phasedata = &aggstate->phases[++phase];
3407  int num_sets;
3408 
3409  phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3410 
3411  if (num_sets)
3412  {
3413  phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3414  phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3415 
3416  i = 0;
3417  foreach(l, aggnode->groupingSets)
3418  {
3419  int current_length = list_length(lfirst(l));
3420  Bitmapset *cols = NULL;
3421 
3422  /* planner forces this to be correct */
3423  for (j = 0; j < current_length; ++j)
3424  cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3425 
3426  phasedata->grouped_cols[i] = cols;
3427  phasedata->gset_lengths[i] = current_length;
3428 
3429  ++i;
3430  }
3431 
3432  all_grouped_cols = bms_add_members(all_grouped_cols,
3433  phasedata->grouped_cols[0]);
3434  }
3435  else
3436  {
3437  Assert(phaseidx == 0);
3438 
3439  phasedata->gset_lengths = NULL;
3440  phasedata->grouped_cols = NULL;
3441  }
3442 
3443  /*
3444  * If we are grouping, precompute fmgr lookup data for inner loop.
3445  */
3446  if (aggnode->aggstrategy == AGG_SORTED)
3447  {
3448  int i = 0;
3449 
3450  Assert(aggnode->numCols > 0);
3451 
3452  /*
3453  * Build a separate function for each subset of columns that
3454  * need to be compared.
3455  */
3456  phasedata->eqfunctions =
3457  (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3458 
3459  /* for each grouping set */
3460  for (i = 0; i < phasedata->numsets; i++)
3461  {
3462  int length = phasedata->gset_lengths[i];
3463 
3464  if (phasedata->eqfunctions[length - 1] != NULL)
3465  continue;
3466 
3467  phasedata->eqfunctions[length - 1] =
3468  execTuplesMatchPrepare(scanDesc,
3469  length,
3470  aggnode->grpColIdx,
3471  aggnode->grpOperators,
3472  aggnode->grpCollations,
3473  (PlanState *) aggstate);
3474  }
3475 
3476  /* and for all grouped columns, unless already computed */
3477  if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3478  {
3479  phasedata->eqfunctions[aggnode->numCols - 1] =
3480  execTuplesMatchPrepare(scanDesc,
3481  aggnode->numCols,
3482  aggnode->grpColIdx,
3483  aggnode->grpOperators,
3484  aggnode->grpCollations,
3485  (PlanState *) aggstate);
3486  }
3487  }
3488 
3489  phasedata->aggnode = aggnode;
3490  phasedata->aggstrategy = aggnode->aggstrategy;
3491  phasedata->sortnode = sortnode;
3492  }
3493  }
3494 
3495  /*
3496  * Convert all_grouped_cols to a descending-order list.
3497  */
3498  i = -1;
3499  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3500  aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3501 
3502  /*
3503  * Set up aggregate-result storage in the output expr context, and also
3504  * allocate my private per-agg working storage
3505  */
3506  econtext = aggstate->ss.ps.ps_ExprContext;
3507  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3508  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3509 
3510  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3511  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
3512 
3513  aggstate->peragg = peraggs;
3514  aggstate->pertrans = pertransstates;
3515 
3516 
3517  aggstate->all_pergroups =
3519  * (numGroupingSets + numHashes));
3520  pergroups = aggstate->all_pergroups;
3521 
3522  if (node->aggstrategy != AGG_HASHED)
3523  {
3524  for (i = 0; i < numGroupingSets; i++)
3525  {
3526  pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3527  * numaggs);
3528  }
3529 
3530  aggstate->pergroups = pergroups;
3531  pergroups += numGroupingSets;
3532  }
3533 
3534  /*
3535  * Hashing can only appear in the initial phase.
3536  */
3537  if (use_hashing)
3538  {
3539  Plan *outerplan = outerPlan(node);
3540  uint64 totalGroups = 0;
3541  int i;
3542 
3543  aggstate->hash_metacxt = AllocSetContextCreate(
3544  aggstate->ss.ps.state->es_query_cxt,
3545  "HashAgg meta context",
3547  aggstate->hash_spill_slot = ExecInitExtraTupleSlot(
3548  estate, scanDesc, &TTSOpsMinimalTuple);
3549 
3550  /* this is an array of pointers, not structures */
3551  aggstate->hash_pergroup = pergroups;
3552 
3553  aggstate->hashentrysize = hash_agg_entry_size(
3554  aggstate->numtrans, outerplan->plan_width, node->transitionSpace);
3555 
3556  /*
3557  * Consider all of the grouping sets together when setting the limits
3558  * and estimating the number of partitions. This can be inaccurate
3559  * when there is more than one grouping set, but should still be
3560  * reasonable.
3561  */
3562  for (i = 0; i < aggstate->num_hashes; i++)
3563  totalGroups += aggstate->perhash[i].aggnode->numGroups;
3564 
3565  hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3566  &aggstate->hash_mem_limit,
3567  &aggstate->hash_ngroups_limit,
3568  &aggstate->hash_planned_partitions);
3569  find_hash_columns(aggstate);
3570  build_hash_tables(aggstate);
3571  aggstate->table_filled = false;
3572  }
3573 
3574  /*
3575  * Initialize current phase-dependent values to initial phase. The initial
3576  * phase is 1 (first sort pass) for all strategies that use sorting (if
3577  * hashing is being done too, then phase 0 is processed last); but if only
3578  * hashing is being done, then phase 0 is all there is.
3579  */
3580  if (node->aggstrategy == AGG_HASHED)
3581  {
3582  aggstate->current_phase = 0;
3583  initialize_phase(aggstate, 0);
3584  select_current_set(aggstate, 0, true);
3585  }
3586  else
3587  {
3588  aggstate->current_phase = 1;
3589  initialize_phase(aggstate, 1);
3590  select_current_set(aggstate, 0, false);
3591  }
3592 
3593  /* -----------------
3594  * Perform lookups of aggregate function info, and initialize the
3595  * unchanging fields of the per-agg and per-trans data.
3596  *
3597  * We try to optimize by detecting duplicate aggregate functions so that
3598  * their state and final values are re-used, rather than needlessly being
3599  * re-calculated independently. We also detect aggregates that are not
3600  * the same, but which can share the same transition state.
3601  *
3602  * Scenarios:
3603  *
3604  * 1. Identical aggregate function calls appear in the query:
3605  *
3606  * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
3607  *
3608  * Since these aggregates are identical, we only need to calculate
3609  * the value once. Both aggregates will share the same 'aggno' value.
3610  *
3611  * 2. Two different aggregate functions appear in the query, but the
3612  * aggregates have the same arguments, transition functions and
3613  * initial values (and, presumably, different final functions):
3614  *
3615  * SELECT AVG(x), STDDEV(x) FROM ...
3616  *
3617  * In this case we must create a new peragg for the varying aggregate,
3618  * and we need to call the final functions separately, but we need
3619  * only run the transition function once. (This requires that the
3620  * final functions be nondestructive of the transition state, but
3621  * that's required anyway for other reasons.)
3622  *
3623  * For either of these optimizations to be valid, all aggregate properties
3624  * used in the transition phase must be the same, including any modifiers
3625  * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
3626  * contain any volatile functions.
3627  * -----------------
3628  */
3629  aggno = -1;
3630  transno = -1;
3631  foreach(l, aggstate->aggs)
3632  {
3633  AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
3634  Aggref *aggref = aggrefstate->aggref;
3635  AggStatePerAgg peragg;
3636  AggStatePerTrans pertrans;
3637  int existing_aggno;
3638  int existing_transno;
3639  List *same_input_transnos;
3640  Oid inputTypes[FUNC_MAX_ARGS];
3641  int numArguments;
3642  int numDirectArgs;
3643  HeapTuple aggTuple;
3644  Form_pg_aggregate aggform;
3645  AclResult aclresult;
3646  Oid transfn_oid,
3647  finalfn_oid;
3648  bool shareable;
3649  Oid serialfn_oid,
3650  deserialfn_oid;
3651  Expr *finalfnexpr;
3652  Oid aggtranstype;
3653  Datum textInitVal;
3654  Datum initValue;
3655  bool initValueIsNull;
3656 
3657  /* Planner should have assigned aggregate to correct level */
3658  Assert(aggref->agglevelsup == 0);
3659  /* ... and the split mode should match */
3660  Assert(aggref->aggsplit == aggstate->aggsplit);
3661 
3662  /* 1. Check for already processed aggs which can be re-used */
3663  existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
3664  &same_input_transnos);
3665  if (existing_aggno != -1)
3666  {
3667  /*
3668  * Existing compatible agg found. so just point the Aggref to the
3669  * same per-agg struct.
3670  */
3671  aggrefstate->aggno = existing_aggno;
3672  continue;
3673  }
3674 
3675  /* Mark Aggref state node with assigned index in the result array */
3676  peragg = &peraggs[++aggno];
3677  peragg->aggref = aggref;
3678  aggrefstate->aggno = aggno;
3679 
3680  /* Fetch the pg_aggregate row */
3681  aggTuple = SearchSysCache1(AGGFNOID,
3682  ObjectIdGetDatum(aggref->aggfnoid));
3683  if (!HeapTupleIsValid(aggTuple))
3684  elog(ERROR, "cache lookup failed for aggregate %u",
3685  aggref->aggfnoid);
3686  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3687 
3688  /* Check permission to call aggregate function */
3689  aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3690  ACL_EXECUTE);
3691  if (aclresult != ACLCHECK_OK)
3692  aclcheck_error(aclresult, OBJECT_AGGREGATE,
3693  get_func_name(aggref->aggfnoid));
3695 
3696  /* planner recorded transition state type in the Aggref itself */
3697  aggtranstype = aggref->aggtranstype;
3698  Assert(OidIsValid(aggtranstype));
3699 
3700  /*
3701  * If this aggregation is performing state combines, then instead of
3702  * using the transition function, we'll use the combine function
3703  */
3704  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3705  {
3706  transfn_oid = aggform->aggcombinefn;
3707 
3708  /* If not set then the planner messed up */
3709  if (!OidIsValid(transfn_oid))
3710  elog(ERROR, "combinefn not set for aggregate function");
3711  }
3712  else
3713  transfn_oid = aggform->aggtransfn;
3714 
3715  /* Final function only required if we're finalizing the aggregates */
3716  if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3717  peragg->finalfn_oid = finalfn_oid = InvalidOid;
3718  else
3719  peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3720 
3721  /*
3722  * If finalfn is marked read-write, we can't share transition states;
3723  * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
3724  * if we're not executing the finalfn here, we can share regardless.
3725  */
3726  shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
3727  (finalfn_oid == InvalidOid);
3728  peragg->shareable = shareable;
3729 
3730  serialfn_oid = InvalidOid;
3731  deserialfn_oid = InvalidOid;
3732 
3733  /*
3734  * Check if serialization/deserialization is required. We only do it
3735  * for aggregates that have transtype INTERNAL.
3736  */
3737  if (aggtranstype == INTERNALOID)
3738  {
3739  /*
3740  * The planner should only have generated a serialize agg node if
3741  * every aggregate with an INTERNAL state has a serialization
3742  * function. Verify that.
3743  */
3744  if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3745  {
3746  /* serialization only valid when not running finalfn */
3747  Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3748 
3749  if (!OidIsValid(aggform->aggserialfn))
3750  elog(ERROR, "serialfunc not provided for serialization aggregation");
3751  serialfn_oid = aggform->aggserialfn;
3752  }
3753 
3754  /* Likewise for deserialization functions */
3755  if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3756  {
3757  /* deserialization only valid when combining states */
3758  Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3759 
3760  if (!OidIsValid(aggform->aggdeserialfn))
3761  elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3762  deserialfn_oid = aggform->aggdeserialfn;
3763  }
3764  }
3765 
3766  /* Check that aggregate owner has permission to call component fns */
3767  {
3768  HeapTuple procTuple;
3769  Oid aggOwner;
3770 
3771  procTuple = SearchSysCache1(PROCOID,
3772  ObjectIdGetDatum(aggref->aggfnoid));
3773  if (!HeapTupleIsValid(procTuple))
3774  elog(ERROR, "cache lookup failed for function %u",
3775  aggref->aggfnoid);
3776  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3777  ReleaseSysCache(procTuple);
3778 
3779  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3780  ACL_EXECUTE);
3781  if (aclresult != ACLCHECK_OK)
3782  aclcheck_error(aclresult, OBJECT_FUNCTION,
3783  get_func_name(transfn_oid));
3784  InvokeFunctionExecuteHook(transfn_oid);
3785  if (OidIsValid(finalfn_oid))
3786  {
3787  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3788  ACL_EXECUTE);
3789  if (aclresult != ACLCHECK_OK)
3790  aclcheck_error(aclresult, OBJECT_FUNCTION,
3791  get_func_name(finalfn_oid));
3792  InvokeFunctionExecuteHook(finalfn_oid);
3793  }
3794  if (OidIsValid(serialfn_oid))
3795  {
3796  aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3797  ACL_EXECUTE);
3798  if (aclresult != ACLCHECK_OK)
3799  aclcheck_error(aclresult, OBJECT_FUNCTION,
3800  get_func_name(serialfn_oid));
3801  InvokeFunctionExecuteHook(serialfn_oid);
3802  }
3803  if (OidIsValid(deserialfn_oid))
3804  {
3805  aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3806  ACL_EXECUTE);
3807  if (aclresult != ACLCHECK_OK)
3808  aclcheck_error(aclresult, OBJECT_FUNCTION,
3809  get_func_name(deserialfn_oid));
3810  InvokeFunctionExecuteHook(deserialfn_oid);
3811  }
3812  }
3813 
3814  /*
3815  * Get actual datatypes of the (nominal) aggregate inputs. These
3816  * could be different from the agg's declared input types, when the
3817  * agg accepts ANY or a polymorphic type.
3818  */
3819  numArguments = get_aggregate_argtypes(aggref, inputTypes);
3820 
3821  /* Count the "direct" arguments, if any */
3822  numDirectArgs = list_length(aggref->aggdirectargs);
3823 
3824  /* Detect how many arguments to pass to the finalfn */
3825  if (aggform->aggfinalextra)
3826  peragg->numFinalArgs = numArguments + 1;
3827  else
3828  peragg->numFinalArgs = numDirectArgs + 1;
3829 
3830  /* Initialize any direct-argument expressions */
3831  peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3832  (PlanState *) aggstate);
3833 
3834  /*
3835  * build expression trees using actual argument & result types for the
3836  * finalfn, if it exists and is required.
3837  */
3838  if (OidIsValid(finalfn_oid))
3839  {
3840  build_aggregate_finalfn_expr(inputTypes,
3841  peragg->numFinalArgs,
3842  aggtranstype,
3843  aggref->aggtype,
3844  aggref->inputcollid,
3845  finalfn_oid,
3846  &finalfnexpr);
3847  fmgr_info(finalfn_oid, &peragg->finalfn);
3848  fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3849  }
3850 
3851  /* get info about the output value's datatype */
3852  get_typlenbyval(aggref->aggtype,
3853  &peragg->resulttypeLen,
3854  &peragg->resulttypeByVal);
3855 
3856  /*
3857  * initval is potentially null, so don't try to access it as a struct
3858  * field. Must do it the hard way with SysCacheGetAttr.
3859  */
3860  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3861  Anum_pg_aggregate_agginitval,
3862  &initValueIsNull);
3863  if (initValueIsNull)
3864  initValue = (Datum) 0;
3865  else
3866  initValue = GetAggInitVal(textInitVal, aggtranstype);
3867 
3868  /*
3869  * 2. Build working state for invoking the transition function, or
3870  * look up previously initialized working state, if we can share it.
3871  *
3872  * find_compatible_peragg() already collected a list of shareable
3873  * per-Trans's with the same inputs. Check if any of them have the
3874  * same transition function and initial value.
3875  */
3876  existing_transno = find_compatible_pertrans(aggstate, aggref,
3877  shareable,
3878  transfn_oid, aggtranstype,
3879  serialfn_oid, deserialfn_oid,
3880  initValue, initValueIsNull,
3881  same_input_transnos);
3882  if (existing_transno != -1)
3883  {
3884  /*
3885  * Existing compatible trans found, so just point the 'peragg' to
3886  * the same per-trans struct, and mark the trans state as shared.
3887  */
3888  pertrans = &pertransstates[existing_transno];
3889  pertrans->aggshared = true;
3890  peragg->transno = existing_transno;
3891  }
3892  else
3893  {
3894  pertrans = &pertransstates[++transno];
3895  build_pertrans_for_aggref(pertrans, aggstate, estate,
3896  aggref, transfn_oid, aggtranstype,
3897  serialfn_oid, deserialfn_oid,
3898  initValue, initValueIsNull,
3899  inputTypes, numArguments);
3900  peragg->transno = transno;
3901  }
3902  ReleaseSysCache(aggTuple);
3903  }
3904 
3905  /*
3906  * Update aggstate->numaggs to be the number of unique aggregates found.
3907  * Also set numstates to the number of unique transition states found.
3908  */
3909  aggstate->numaggs = aggno + 1;
3910  aggstate->numtrans = transno + 1;
3911 
3912  /*
3913  * Last, check whether any more aggregates got added onto the node while
3914  * we processed the expressions for the aggregate arguments (including not
3915  * only the regular arguments and FILTER expressions handled immediately
3916  * above, but any direct arguments we might've handled earlier). If so,
3917  * we have nested aggregate functions, which is semantically nonsensical,
3918  * so complain. (This should have been caught by the parser, so we don't
3919  * need to work hard on a helpful error message; but we defend against it
3920  * here anyway, just to be sure.)
3921  */
3922  if (numaggs != list_length(aggstate->aggs))
3923  ereport(ERROR,
3924  (errcode(ERRCODE_GROUPING_ERROR),
3925  errmsg("aggregate function calls cannot be nested")));
3926 
3927  /*
3928  * Build expressions doing all the transition work at once. We build a
3929  * different one for each phase, as the number of transition function
3930  * invocation can differ between phases. Note this'll work both for
3931  * transition and combination functions (although there'll only be one
3932  * phase in the latter case).
3933  */
3934  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3935  {
3936  AggStatePerPhase phase = &aggstate->phases[phaseidx];
3937  bool dohash = false;
3938  bool dosort = false;
3939 
3940  /* phase 0 doesn't necessarily exist */
3941  if (!phase->aggnode)
3942  continue;
3943 
3944  if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3945  {
3946  /*
3947  * Phase one, and only phase one, in a mixed agg performs both
3948  * sorting and aggregation.
3949  */
3950  dohash = true;
3951  dosort = true;
3952  }
3953  else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
3954  {
3955  /*
3956  * No need to compute a transition function for an AGG_MIXED phase
3957  * 0 - the contents of the hashtables will have been computed
3958  * during phase 1.
3959  */
3960  continue;
3961  }
3962  else if (phase->aggstrategy == AGG_PLAIN ||
3963  phase->aggstrategy == AGG_SORTED)
3964  {
3965  dohash = false;
3966  dosort = true;
3967  }
3968  else if (phase->aggstrategy == AGG_HASHED)
3969  {
3970  dohash = true;
3971  dosort = false;
3972  }
3973  else
3974  Assert(false);
3975 
3976  phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
3977  false);
3978 
3979  /* cache compiled expression for outer slot without NULL check */
3980  phase->evaltrans_cache[0][0] = phase->evaltrans;
3981  }
3982 
3983  return aggstate;
3984 }
3985 
3986 /*
3987  * Build the state needed to calculate a state value for an aggregate.
3988  *
3989  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
3990  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
3991  * of the arguments could be calculated from 'aggref', but the caller has
3992  * calculated them already, so might as well pass them.
3993  */
3994 static void
3996  AggState *aggstate, EState *estate,
3997  Aggref *aggref,
3998  Oid aggtransfn, Oid aggtranstype,
3999  Oid aggserialfn, Oid aggdeserialfn,
4000  Datum initValue, bool initValueIsNull,
4001  Oid *inputTypes, int numArguments)
4002 {
4003  int numGroupingSets = Max(aggstate->maxsets, 1);
4004  Expr *serialfnexpr = NULL;
4005  Expr *deserialfnexpr = NULL;
4006  ListCell *lc;
4007  int numInputs;
4008  int numDirectArgs;
4009  List *sortlist;
4010  int numSortCols;
4011  int numDistinctCols;
4012  int i;
4013 
4014  /* Begin filling in the pertrans data */
4015  pertrans->aggref = aggref;
4016  pertrans->aggshared = false;
4017  pertrans->aggCollation = aggref->inputcollid;
4018  pertrans->transfn_oid = aggtransfn;
4019  pertrans->serialfn_oid = aggserialfn;
4020  pertrans->deserialfn_oid = aggdeserialfn;
4021  pertrans->initValue = initValue;
4022  pertrans->initValueIsNull = initValueIsNull;
4023 
4024  /* Count the "direct" arguments, if any */
4025  numDirectArgs = list_length(aggref->aggdirectargs);
4026 
4027  /* Count the number of aggregated input columns */
4028  pertrans->numInputs = numInputs = list_length(aggref->args);
4029 
4030  pertrans->aggtranstype = aggtranstype;
4031 
4032  /*
4033  * When combining states, we have no use at all for the aggregate
4034  * function's transfn. Instead we use the combinefn. In this case, the
4035  * transfn and transfn_oid fields of pertrans refer to the combine
4036  * function rather than the transition function.
4037  */
4038  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4039  {
4040  Expr *combinefnexpr;
4041  size_t numTransArgs;
4042 
4043  /*
4044  * When combining there's only one input, the to-be-combined added
4045  * transition value from below (this node's transition value is
4046  * counted separately).
4047  */
4048  pertrans->numTransInputs = 1;
4049 
4050  /* account for the current transition state */
4051  numTransArgs = pertrans->numTransInputs + 1;
4052 
4053  build_aggregate_combinefn_expr(aggtranstype,
4054  aggref->inputcollid,
4055  aggtransfn,
4056  &combinefnexpr);
4057  fmgr_info(aggtransfn, &pertrans->transfn);
4058  fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4059 
4060  pertrans->transfn_fcinfo =
4063  &pertrans->transfn,
4064  numTransArgs,
4065  pertrans->aggCollation,
4066  (void *) aggstate, NULL);
4067 
4068  /*
4069  * Ensure that a combine function to combine INTERNAL states is not
4070  * strict. This should have been checked during CREATE AGGREGATE, but
4071  * the strict property could have been changed since then.
4072  */
4073  if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4074  ereport(ERROR,
4075  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4076  errmsg("combine function with transition type %s must not be declared STRICT",
4077  format_type_be(aggtranstype))));
4078  }
4079  else
4080  {
4081  Expr *transfnexpr;
4082  size_t numTransArgs;
4083 
4084  /* Detect how many arguments to pass to the transfn */
4085  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4086  pertrans->numTransInputs = numInputs;
4087  else
4088  pertrans->numTransInputs = numArguments;
4089 
4090  /* account for the current transition state */
4091  numTransArgs = pertrans->numTransInputs + 1;
4092 
4093  /*
4094  * Set up infrastructure for calling the transfn. Note that
4095  * invtransfn is not needed here.
4096  */
4097  build_aggregate_transfn_expr(inputTypes,
4098  numArguments,
4099  numDirectArgs,
4100  aggref->aggvariadic,
4101  aggtranstype,
4102  aggref->inputcollid,
4103  aggtransfn,
4104  InvalidOid,
4105  &transfnexpr,
4106  NULL);
4107  fmgr_info(aggtransfn, &pertrans->transfn);
4108  fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4109 
4110  pertrans->transfn_fcinfo =
4113  &pertrans->transfn,
4114  numTransArgs,
4115  pertrans->aggCollation,
4116  (void *) aggstate, NULL);
4117 
4118  /*
4119  * If the transfn is strict and the initval is NULL, make sure input
4120  * type and transtype are the same (or at least binary-compatible), so
4121  * that it's OK to use the first aggregated input value as the initial
4122  * transValue. This should have been checked at agg definition time,
4123  * but we must check again in case the transfn's strictness property
4124  * has been changed.
4125  */
4126  if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4127  {
4128  if (numArguments <= numDirectArgs ||
4129  !IsBinaryCoercible(inputTypes[numDirectArgs],
4130  aggtranstype))
4131  ereport(ERROR,
4132  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4133  errmsg("aggregate %u needs to have compatible input type and transition type",
4134  aggref->aggfnoid)));
4135  }
4136  }
4137 
4138  /* get info about the state value's datatype */
4139  get_typlenbyval(aggtranstype,
4140  &pertrans->transtypeLen,
4141  &pertrans->transtypeByVal);
4142 
4143  if (OidIsValid(aggserialfn))
4144  {
4145  build_aggregate_serialfn_expr(aggserialfn,
4146  &serialfnexpr);
4147  fmgr_info(aggserialfn, &pertrans->serialfn);
4148  fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4149 
4150  pertrans->serialfn_fcinfo =
4153  &pertrans->serialfn,
4154  1,
4155  InvalidOid,
4156  (void *) aggstate, NULL);
4157  }
4158 
4159  if (OidIsValid(aggdeserialfn))
4160  {
4161  build_aggregate_deserialfn_expr(aggdeserialfn,
4162  &deserialfnexpr);
4163  fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4164  fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4165 
4166  pertrans->deserialfn_fcinfo =
4169  &pertrans->deserialfn,
4170  2,
4171  InvalidOid,
4172  (void *) aggstate, NULL);
4173 
4174  }
4175 
4176  /*
4177  * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4178  * have a list of SortGroupClause nodes; fish out the data in them and
4179  * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4180  * however; the agg's transfn and finalfn are responsible for that.
4181  *
4182  * Note that by construction, if there is a DISTINCT clause then the ORDER
4183  * BY clause is a prefix of it (see transformDistinctClause).
4184  */
4185  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4186  {
4187  sortlist = NIL;
4188  numSortCols = numDistinctCols = 0;
4189  }
4190  else if (aggref->aggdistinct)
4191  {
4192  sortlist = aggref->aggdistinct;
4193  numSortCols = numDistinctCols = list_length(sortlist);
4194  Assert(numSortCols >= list_length(aggref->aggorder));
4195  }
4196  else
4197  {
4198  sortlist = aggref->aggorder;
4199  numSortCols = list_length(sortlist);
4200  numDistinctCols = 0;
4201  }
4202 
4203  pertrans->numSortCols = numSortCols;
4204  pertrans->numDistinctCols = numDistinctCols;
4205 
4206  /*
4207  * If we have either sorting or filtering to do, create a tupledesc and
4208  * slot corresponding to the aggregated inputs (including sort
4209  * expressions) of the agg.
4210  */
4211  if (numSortCols > 0 || aggref->aggfilter)
4212  {
4213  pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4214  pertrans->sortslot =
4215  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4217  }
4218 
4219  if (numSortCols > 0)
4220  {
4221  /*
4222  * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4223  * (yet)
4224  */
4225  Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4226 
4227  /* If we have only one input, we need its len/byval info. */
4228  if (numInputs == 1)
4229  {
4230  get_typlenbyval(inputTypes[numDirectArgs],
4231  &pertrans->inputtypeLen,
4232  &pertrans->inputtypeByVal);
4233  }
4234  else if (numDistinctCols > 0)
4235  {
4236  /* we will need an extra slot to store prior values */
4237  pertrans->uniqslot =
4238  ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4240  }
4241 
4242  /* Extract the sort information for use later */
4243  pertrans->sortColIdx =
4244  (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4245  pertrans->sortOperators =
4246  (Oid *) palloc(numSortCols * sizeof(Oid));
4247  pertrans->sortCollations =
4248  (Oid *) palloc(numSortCols * sizeof(Oid));
4249  pertrans->sortNullsFirst =
4250  (bool *) palloc(numSortCols * sizeof(bool));
4251 
4252  i = 0;
4253  foreach(lc, sortlist)
4254  {
4255  SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4256  TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4257 
4258  /* the parser should have made sure of this */
4259  Assert(OidIsValid(sortcl->sortop));
4260 
4261  pertrans->sortColIdx[i] = tle->resno;
4262  pertrans->sortOperators[i] = sortcl->sortop;
4263  pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4264  pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4265  i++;
4266  }
4267  Assert(i == numSortCols);
4268  }
4269 
4270  if (aggref->aggdistinct)
4271  {
4272  Oid *ops;
4273 
4274  Assert(numArguments > 0);
4275  Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4276 
4277  ops = palloc(numDistinctCols * sizeof(Oid));
4278 
4279  i = 0;
4280  foreach(lc, aggref->aggdistinct)
4281  ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4282 
4283  /* lookup / build the necessary comparators */
4284  if (numDistinctCols == 1)
4285  fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4286  else
4287  pertrans->equalfnMulti =
4288  execTuplesMatchPrepare(pertrans->sortdesc,
4289  numDistinctCols,
4290  pertrans->sortColIdx,
4291  ops,
4292  pertrans->sortCollations,
4293  &aggstate->ss.ps);
4294  pfree(ops);
4295  }
4296 
4297  pertrans->sortstates = (Tuplesortstate **)
4298  palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4299 }
4300 
4301 
4302 static Datum
4303 GetAggInitVal(Datum textInitVal, Oid transtype)
4304 {
4305  Oid typinput,
4306  typioparam;
4307  char *strInitVal;
4308  Datum initVal;
4309 
4310  getTypeInputInfo(transtype, &typinput, &typioparam);
4311  strInitVal = TextDatumGetCString(textInitVal);
4312  initVal = OidInputFunctionCall(typinput, strInitVal,
4313  typioparam, -1);
4314  pfree(strInitVal);
4315  return initVal;
4316 }
4317 
4318 /*
4319  * find_compatible_peragg - search for a previously initialized per-Agg struct
4320  *
4321  * Searches the previously looked at aggregates to find one which is compatible
4322  * with this one, with the same input parameters. If no compatible aggregate
4323  * can be found, returns -1.
4324  *
4325  * As a side-effect, this also collects a list of existing, shareable per-Trans
4326  * structs with matching inputs. If no identical Aggref is found, the list is
4327  * passed later to find_compatible_pertrans, to see if we can at least reuse
4328  * the state value of another aggregate.
4329  */
4330 static int
4332  int lastaggno, List **same_input_transnos)
4333 {
4334  int aggno;
4335  AggStatePerAgg peraggs;
4336 
4337  *same_input_transnos = NIL;
4338 
4339  /* we mustn't reuse the aggref if it contains volatile function calls */
4340  if (contain_volatile_functions((Node *) newagg))
4341  return -1;
4342 
4343  peraggs = aggstate->peragg;
4344 
4345  /*
4346  * Search through the list of already seen aggregates. If we find an
4347  * existing identical aggregate call, then we can re-use that one. While
4348  * searching, we'll also collect a list of Aggrefs with the same input
4349  * parameters. If no matching Aggref is found, the caller can potentially
4350  * still re-use the transition state of one of them. (At this stage we
4351  * just compare the parsetrees; whether different aggregates share the
4352  * same transition function will be checked later.)
4353  */
4354  for (aggno = 0; aggno <= lastaggno; aggno++)
4355  {
4356  AggStatePerAgg peragg;
4357  Aggref *existingRef;
4358 
4359  peragg = &peraggs[aggno];
4360  existingRef = peragg->aggref;
4361 
4362  /* all of the following must be the same or it's no match */
4363  if (newagg->inputcollid != existingRef->inputcollid ||
4364  newagg->aggtranstype != existingRef->aggtranstype ||
4365  newagg->aggstar != existingRef->aggstar ||
4366  newagg->aggvariadic != existingRef->aggvariadic ||
4367  newagg->aggkind != existingRef->aggkind ||
4368  !equal(newagg->args, existingRef->args) ||
4369  !equal(newagg->aggorder, existingRef->aggorder) ||
4370  !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
4371  !equal(newagg->aggfilter, existingRef->aggfilter))
4372  continue;
4373 
4374  /* if it's the same aggregate function then report exact match */
4375  if (newagg->aggfnoid == existingRef->aggfnoid &&
4376  newagg->aggtype == existingRef->aggtype &&
4377  newagg->aggcollid == existingRef->aggcollid &&
4378  equal(newagg->aggdirectargs, existingRef->aggdirectargs))
4379  {
4380  list_free(*same_input_transnos);
4381  *same_input_transnos = NIL;
4382  return aggno;
4383  }
4384 
4385  /*
4386  * Not identical, but it had the same inputs. If the final function
4387  * permits sharing, return its transno to the caller, in case we can
4388  * re-use its per-trans state. (If there's already sharing going on,
4389  * we might report a transno more than once. find_compatible_pertrans
4390  * is cheap enough that it's not worth spending cycles to avoid that.)
4391  */
4392  if (peragg->shareable)
4393  *same_input_transnos = lappend_int(*same_input_transnos,
4394  peragg->transno);
4395  }
4396 
4397  return -1;
4398 }
4399 
4400 /*
4401  * find_compatible_pertrans - search for a previously initialized per-Trans
4402  * struct
4403  *
4404  * Searches the list of transnos for a per-Trans struct with the same
4405  * transition function and initial condition. (The inputs have already been
4406  * verified to match.)
4407  */
4408 static int
4409 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
4410  Oid aggtransfn, Oid aggtranstype,
4411  Oid aggserialfn, Oid aggdeserialfn,
4412  Datum initValue, bool initValueIsNull,
4413  List *transnos)
4414 {
4415  ListCell *lc;
4416 
4417  /* If this aggregate can't share transition states, give up */
4418  if (!shareable)
4419  return -1;
4420 
4421  foreach(lc, transnos)
4422  {
4423  int transno = lfirst_int(lc);
4424  AggStatePerTrans pertrans = &aggstate->pertrans[transno];
4425 
4426  /*
4427  * if the transfns or transition state types are not the same then the
4428  * state can't be shared.
4429  */
4430  if (aggtransfn != pertrans->transfn_oid ||
4431  aggtranstype != pertrans->aggtranstype)
4432  continue;
4433 
4434  /*
4435  * The serialization and deserialization functions must match, if
4436  * present, as we're unable to share the trans state for aggregates
4437  * which will serialize or deserialize into different formats.
4438  * Remember that these will be InvalidOid if they're not required for
4439  * this agg node.
4440  */
4441  if (aggserialfn != pertrans->serialfn_oid ||
4442  aggdeserialfn != pertrans->deserialfn_oid)
4443  continue;
4444 
4445  /*
4446  * Check that the initial condition matches, too.
4447  */
4448  if (initValueIsNull && pertrans->initValueIsNull)
4449  return transno;
4450 
4451  if (!initValueIsNull && !pertrans->initValueIsNull &&
4452  datumIsEqual(initValue, pertrans->initValue,
4453  pertrans->transtypeByVal, pertrans->transtypeLen))
4454  return transno;
4455  }
4456  return -1;
4457 }
4458 
4459 void
4461 {
4463  int transno;
4464  int numGroupingSets = Max(node->maxsets, 1);
4465  int setno;
4466 
4467  /* Make sure we have closed any open tuplesorts */
4468 
4469  if (node->sort_in)
4470  tuplesort_end(node->sort_in);
4471  if (node->sort_out)
4472  tuplesort_end(node->sort_out);
4473 
4475 
4476  if (node->hash_metacxt != NULL)
4477  {
4479  node->hash_metacxt = NULL;
4480  }
4481 
4482  for (transno = 0; transno < node->numtrans; transno++)
4483  {
4484  AggStatePerTrans pertrans = &node->pertrans[transno];
4485 
4486  for (setno = 0; setno < numGroupingSets; setno++)
4487  {
4488  if (pertrans->sortstates[setno])
4489  tuplesort_end(pertrans->sortstates[setno]);
4490  }
4491  }
4492 
4493  /* And ensure any agg shutdown callbacks have been called */
4494  for (setno = 0; setno < numGroupingSets; setno++)
4495  ReScanExprContext(node->aggcontexts[setno]);
4496  if (node->hashcontext)
4498 
4499  /*
4500  * We don't actually free any ExprContexts here (see comment in
4501  * ExecFreeExprContext), just unlinking the output one from the plan node
4502  * suffices.
4503  */
4504  ExecFreeExprContext(&node->ss.ps);
4505 
4506  /* clean up tuple table */
4508 
4509  outerPlan = outerPlanState(node);
4510  ExecEndNode(outerPlan);
4511 }
4512 
4513 void
4515 {
4516  ExprContext *econtext = node->ss.ps.ps_ExprContext;
4518  Agg *aggnode = (Agg *) node->ss.ps.plan;
4519  int transno;
4520  int numGroupingSets = Max(node->maxsets, 1);
4521  int setno;
4522 
4523  node->agg_done = false;
4524 
4525  if (node->aggstrategy == AGG_HASHED)
4526  {
4527  /*
4528  * In the hashed case, if we haven't yet built the hash table then we
4529  * can just return; nothing done yet, so nothing to undo. If subnode's
4530  * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4531  * else no reason to re-scan it at all.
4532  */
4533  if (!node->table_filled)
4534  return;
4535 
4536  /*
4537  * If we do have the hash table, and it never spilled, and the subplan
4538  * does not have any parameter changes, and none of our own parameter
4539  * changes affect input expressions of the aggregated functions, then
4540  * we can just rescan the existing hash table; no need to build it
4541  * again.
4542  */
4543  if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4544  !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4545  {
4547  &node->perhash[0].hashiter);
4548  select_current_set(node, 0, true);
4549  return;
4550  }
4551  }
4552 
4553  /* Make sure we have closed any open tuplesorts */
4554  for (transno = 0; transno < node->numtrans; transno++)
4555  {
4556  for (setno = 0; setno < numGroupingSets; setno++)
4557  {
4558  AggStatePerTrans pertrans = &node->pertrans[transno];
4559 
4560  if (pertrans->sortstates[setno])
4561  {
4562  tuplesort_end(pertrans->sortstates[setno]);
4563  pertrans->sortstates[setno] = NULL;
4564  }
4565  }
4566  }
4567 
4568  /*
4569  * We don't need to ReScanExprContext the output tuple context here;
4570  * ExecReScan already did it. But we do need to reset our per-grouping-set
4571  * contexts, which may have transvalues stored in them. (We use rescan
4572  * rather than just reset because transfns may have registered callbacks
4573  * that need to be run now.) For the AGG_HASHED case, see below.
4574  */
4575 
4576  for (setno = 0; setno < numGroupingSets; setno++)
4577  {
4578  ReScanExprContext(node->aggcontexts[setno]);
4579  }
4580 
4581  /* Release first tuple of group, if we have made a copy */
4582  if (node->grp_firstTuple != NULL)
4583  {
4585  node->grp_firstTuple = NULL;
4586  }
4588 
4589  /* Forget current agg values */
4590  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4591  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4592 
4593  /*
4594  * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4595  * the hashcontext. This used to be an issue, but now, resetting a context
4596  * automatically deletes sub-contexts too.
4597  */
4598  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4599  {
4601 
4602  node->hash_ever_spilled = false;
4603  node->hash_spill_mode = false;
4604  node->hash_ngroups_current = 0;
4605 
4607  /* Rebuild an empty hash table */
4608  build_hash_tables(node);
4609  node->table_filled = false;
4610  /* iterator will be reset when the table is filled */
4611 
4612  hashagg_recompile_expressions(node, false, false);
4613  }
4614 
4615  if (node->aggstrategy != AGG_HASHED)
4616  {
4617  /*
4618  * Reset the per-group state (in particular, mark transvalues null)
4619  */
4620  for (setno = 0; setno < numGroupingSets; setno++)
4621  {
4622  MemSet(node->pergroups[setno], 0,
4623  sizeof(AggStatePerGroupData) * node->numaggs);
4624  }
4625 
4626  /* reset to phase 1 */
4627  initialize_phase(node, 1);
4628 
4629  node->input_done = false;
4630  node->projected_set = -1;
4631  }
4632 
4633  if (outerPlan->chgParam == NULL)
4634  ExecReScan(outerPlan);
4635 }
4636 
4637 
4638 /***********************************************************************
4639  * API exposed to aggregate functions
4640  ***********************************************************************/
4641 
4642 
4643 /*
4644  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4645  *
4646  * The transition and/or final functions of an aggregate may want to verify
4647  * that they are being called as aggregates, rather than as plain SQL
4648  * functions. They should use this function to do so. The return value
4649  * is nonzero if being called as an aggregate, or zero if not. (Specific
4650  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4651  * values could conceivably appear in future.)
4652  *
4653  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4654  * identity of the memory context that aggregate transition values are being
4655  * stored in. Note that the same aggregate call site (flinfo) may be called
4656  * interleaved on different transition values in different contexts, so it's
4657  * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4658  * cache it in the transvalue itself (for internal-type transvalues).
4659  */
4660 int
4662 {
4663  if (fcinfo->context && IsA(fcinfo->context, AggState))
4664  {
4665  if (aggcontext)
4666  {
4667  AggState *aggstate = ((AggState *) fcinfo->context);
4668  ExprContext *cxt = aggstate->curaggcontext;
4669 
4670  *aggcontext = cxt->ecxt_per_tuple_memory;
4671  }
4672  return AGG_CONTEXT_AGGREGATE;
4673  }
4674  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4675  {
4676  if (aggcontext)
4677  *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4678  return AGG_CONTEXT_WINDOW;
4679  }
4680 
4681  /* this is just to prevent "uninitialized variable" warnings */
4682  if (aggcontext)
4683  *aggcontext = NULL;
4684  return 0;
4685 }
4686 
4687 /*
4688  * AggGetAggref - allow an aggregate support function to get its Aggref
4689  *
4690  * If the function is being called as an aggregate support function,
4691  * return the Aggref node for the aggregate call. Otherwise, return NULL.
4692  *
4693  * Aggregates sharing the same inputs and transition functions can get
4694  * merged into a single transition calculation. If the transition function
4695  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4696  * executing. It must therefore not pay attention to the Aggref fields that
4697  * relate to the final function, as those are indeterminate. But if a final
4698  * function calls AggGetAggref, it will get a precise result.
4699  *
4700  * Note that if an aggregate is being used as a window function, this will
4701  * return NULL. We could provide a similar function to return the relevant
4702  * WindowFunc node in such cases, but it's not needed yet.
4703  */
4704 Aggref *
4706 {
4707  if (fcinfo->context && IsA(fcinfo->context, AggState))
4708  {
4709  AggState *aggstate = (AggState *) fcinfo->context;
4710  AggStatePerAgg curperagg;
4711  AggStatePerTrans curpertrans;
4712 
4713  /* check curperagg (valid when in a final function) */
4714  curperagg = aggstate->curperagg;
4715 
4716  if (curperagg)
4717  return curperagg->aggref;
4718 
4719  /* check curpertrans (valid when in a transition function) */
4720  curpertrans = aggstate->curpertrans;
4721 
4722  if (curpertrans)
4723  return curpertrans->aggref;
4724  }
4725  return NULL;
4726 }
4727 
4728 /*
4729  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4730  *
4731  * This is useful in agg final functions; the context returned is one that
4732  * the final function can safely reset as desired. This isn't useful for
4733  * transition functions, since the context returned MAY (we don't promise)
4734  * be the same as the context those are called in.
4735  *
4736  * As above, this is currently not useful for aggs called as window functions.
4737  */
4740 {
4741  if (fcinfo->context && IsA(fcinfo->context, AggState))
4742  {
4743  AggState *aggstate = (AggState *) fcinfo->context;
4744 
4745  return aggstate->tmpcontext->ecxt_per_tuple_memory;
4746  }
4747  return NULL;
4748 }
4749 
4750 /*
4751  * AggStateIsShared - find out whether transition state is shared
4752  *
4753  * If the function is being called as an aggregate support function,
4754  * return true if the aggregate's transition state is shared across
4755  * multiple aggregates, false if it is not.
4756  *
4757  * Returns true if not called as an aggregate support function.
4758  * This is intended as a conservative answer, ie "no you'd better not
4759  * scribble on your input". In particular, will return true if the
4760  * aggregate is being used as a window function, which is a scenario
4761  * in which changing the transition state is a bad idea. We might
4762  * want to refine the behavior for the window case in future.
4763  */
4764 bool
4766 {
4767  if (fcinfo->context && IsA(fcinfo->context, AggState))
4768  {
4769  AggState *aggstate = (AggState *) fcinfo->context;
4770  AggStatePerAgg curperagg;
4771  AggStatePerTrans curpertrans;
4772 
4773  /* check curperagg (valid when in a final function) */
4774  curperagg = aggstate->curperagg;
4775 
4776  if (curperagg)
4777  return aggstate->pertrans[curperagg->transno].aggshared;
4778 
4779  /* check curpertrans (valid when in a transition function) */
4780  curpertrans = aggstate->curpertrans;
4781 
4782  if (curpertrans)
4783  return curpertrans->aggshared;
4784  }
4785  return true;
4786 }
4787 
4788 /*
4789  * AggRegisterCallback - register a cleanup callback for an aggregate
4790  *
4791  * This is useful for aggs to register shutdown callbacks, which will ensure
4792  * that non-memory resources are freed. The callback will occur just before
4793  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4794  * either between groups or as a result of rescanning the query. The callback
4795  * will NOT be called on error paths. The typical use-case is for freeing of
4796  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4797  * created by the agg functions. (The callback will not be called until after
4798  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4799  * to return data that will be freed by the callback.)
4800  *
4801  * As above, this is currently not useful for aggs called as window functions.
4802  */
4803 void
4806  Datum arg)
4807 {
4808  if (fcinfo->context && IsA(fcinfo->context, AggState))
4809  {
4810  AggState *aggstate = (AggState *) fcinfo->context;
4811  ExprContext *cxt = aggstate->curaggcontext;
4812 
4813  RegisterExprContextCallback(cxt, func, arg);
4814 
4815  return;
4816  }
4817  elog(ERROR, "aggregate function cannot register a callback in this context");
4818 }
4819 
4820 
4821 /*
4822  * aggregate_dummy - dummy execution routine for aggregate functions
4823  *
4824  * This function is listed as the implementation (prosrc field) of pg_proc
4825  * entries for aggregate functions. Its only purpose is to throw an error
4826  * if someone mistakenly executes such a function in the normal way.
4827  *
4828  * Perhaps someday we could assign real meaning to the prosrc field of
4829  * an aggregate?
4830  */
4831 Datum
4833 {
4834  elog(ERROR, "aggregate function %u called as normal function",
4835  fcinfo->flinfo->fn_oid);
4836  return (Datum) 0; /* keep compiler quiet */
4837 }
static void hashagg_reset_spill_state(AggState *aggstate)
Definition: nodeAgg.c:3089
List * aggdistinct
Definition: primnodes.h:321
struct AggStatePerTransData * AggStatePerTrans
Definition: execnodes.h:2038
ExprState ** eqfunctions
Definition: nodeAgg.h:277
struct HashAggSpill * hash_spills
Definition: execnodes.h:2086
AggStatePerGroup * hash_pergroup
Definition: execnodes.h:2105
#define NIL
Definition: pg_list.h:65
static HashAggBatch * hashagg_batch_new(LogicalTapeSet *tapeset, int input_tapenum, int setno, int64 input_tuples, int used_bits)
Definition: nodeAgg.c:2955
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:880
static TupleTableSlot * fetch_input_tuple(AggState *aggstate)
Definition: nodeAgg.c:546
struct AggStatePerGroupData * AggStatePerGroup
Definition: execnodes.h:2039
#define ScanTupleHashTable(htable, iter)
Definition: execnodes.h:729
static void select_current_set(AggState *aggstate, int setno, bool is_hash)
Definition: nodeAgg.c:454
int numCols
Definition: plannodes.h:811
static int partitions
Definition: pgbench.c:195
List * qual
Definition: plannodes.h:143
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev)
Definition: tuplesort.c:2245
TupleHashTable BuildTupleHashTableExt(PlanState *parent, TupleDesc inputDesc, int numCols, AttrNumber *keyColIdx, const Oid *eqfuncoids, FmgrInfo *hashfunctions, Oid *collations, long nbuckets, Size additionalsize, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, bool use_variable_hash_iv)
Definition: execGrouping.c:154
Datum aggregate_dummy(PG_FUNCTION_ARGS)
Definition: nodeAgg.c:4832
bool aggvariadic
Definition: primnodes.h:324
int bms_first_member(Bitmapset *a)
Definition: bitmapset.c:996
AggStatePerPhase phases
Definition: execnodes.h:2073
double hashentrysize
Definition: execnodes.h:2097
#define IsA(nodeptr, _type_)
Definition: nodes.h:577
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1791
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
Definition: nodeAgg.c:2871
#define AllocSetContextCreate
Definition: memutils.h:170
AttrNumber * hashGrpColIdxInput
Definition: nodeAgg.h:307
Datum * ecxt_aggvalues
Definition: execnodes.h:245
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
Definition: nodeAgg.c:1841
TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew, uint32 hash)
Definition: execGrouping.c:350
uint64 hash_ngroups_limit
Definition: execnodes.h:2094
#define HASHAGG_MAX_PARTITIONS
Definition: nodeAgg.c:286
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1416
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:305
Index varlevelsup
Definition: primnodes.h:191
TargetEntry * get_sortgroupclause_tle(SortGroupClause *sgClause, List *targetList)
Definition: tlist.c:389
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1801
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Bitmapset * bms_copy(const Bitmapset *a)
Definition: bitmapset.c:74
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:1099
static void hash_agg_check_limits(AggState *aggstate)
Definition: nodeAgg.c:1779
static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory)
Definition: nodeAgg.c:1892
AttrNumber * grpColIdx
Definition: plannodes.h:812
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:985
uint64 transitionSpace
Definition: plannodes.h:816
static void agg_fill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2482
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
Definition: execUtils.c:463
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash, bool *in_hash_table)
Definition: nodeAgg.c:1978
List * lcons_int(int datum, List *list)
Definition: list.c:472
TupleTableSlot * ExecStoreAllNullTuple(TupleTableSlot *slot)
Definition: execTuples.c:1546
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3028
int numaggs
Definition: execnodes.h:2047
int nfreetapes
Definition: nodeAgg.c:317
Oid GetUserId(void)
Definition: miscinit.c:439
bool agg_done
Definition: execnodes.h:2065
#define castNode(_type_, nodeptr)
Definition: nodes.h:595
Oid * grpCollations
Definition: plannodes.h:814
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:537
#define TTS_EMPTY(slot)
Definition: tuptable.h:97
TupleTableSlot * sort_slot
Definition: execnodes.h:2076
List * all_grouped_cols
Definition: execnodes.h:2070
Tuplesortstate * sort_out
Definition: execnodes.h:2075
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Definition: execTuples.c:1662
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1138
ScanState ss
Definition: execnodes.h:2045
FmgrInfo equalfnOne
Definition: nodeAgg.h:109
ExprContext * ps_ExprContext
Definition: execnodes.h:984
MinimalTuple firstTuple
Definition: execnodes.h:682
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
Definition: nodeAgg.c:3057
ExprState * evaltrans
Definition: nodeAgg.h:282
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
int64 input_tuples
Definition: nodeAgg.c:355
void ExecReScan(PlanState *node)
Definition: execAmi.c:75
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1043
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
bool datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen)
Definition: datum.c:222
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct HashTapeInfo HashTapeInfo
Oid inputcollid
Definition: primnodes.h:315
int current_phase
Definition: execnodes.h:2053
static void hashagg_finish_initial_spills(AggState *aggstate)
Definition: nodeAgg.c:3024
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition: tuptable.h:341
Definition: nodes.h:526
AggSplit aggsplit
Definition: execnodes.h:2050
static TupleTableSlot * ExecAgg(PlanState *pstate)
Definition: nodeAgg.c:2100
bool * nullsFirst
Definition: plannodes.h:774
int errcode(int sqlerrcode)
Definition: elog.c:610
List * args
Definition: primnodes.h:319
#define MemSet(start, val, len)
Definition: c.h:971
AttrNumber varattno
Definition: primnodes.h:186
char * format_type_be(Oid type_oid)
Definition: format_type.c:327
fmNodePtr context
Definition: fmgr.h:88
Datum * tts_values
Definition: tuptable.h:126
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1335
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
void build_aggregate_deserialfn_expr(Oid deserialfn_oid, Expr **deserialfnexpr)
Definition: parse_agg.c:2013
static void finalize_aggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull)
Definition: nodeAgg.c:1035
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2037
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:726
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, Oid aggtransfn, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, List *transnos)
Definition: nodeAgg.c:4409
AggStatePerTrans pertrans
Definition: execnodes.h:2055
EState * state
Definition: execnodes.h:947
int projected_set
Definition: execnodes.h:2066
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1152
bool aggstar
Definition: primnodes.h:323
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:370
HeapTuple grp_firstTuple
Definition: execnodes.h:2080
Definition: primnodes.h:181
Aggref * aggref
Definition: nodeAgg.h:186
static TupleTableSlot * project_aggregates(AggState *aggstate)
Definition: nodeAgg.c:1351
static void advance_aggregates(AggState *aggstate)
Definition: nodeAgg.c:817
int current_set
Definition: execnodes.h:2068
uint32 mask
Definition: nodeAgg.c:336
#define OidIsValid(objectId)
Definition: c.h:644
#define DO_AGGSPLIT_COMBINE(as)
Definition: nodes.h:788
FunctionCallInfo transfn_fcinfo
Definition: nodeAgg.h:161
struct HashAggBatch HashAggBatch
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:614
Datum ExecAggTransReparent(AggState *aggstate, AggStatePerTrans pertrans, Datum newValue, bool newValueIsNull, Datum oldValue, bool oldValueIsNull)
int numtrans
Definition: execnodes.h:2048
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1439
static void hash_agg_enter_spill_mode(AggState *aggstate)
Definition: nodeAgg.c:1805
TupleDesc sortdesc
Definition: nodeAgg.h:137
Oid * sortOperators
Definition: plannodes.h:772
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:854
void execTuplesHashPrepare(int numCols, const Oid *eqOperators, Oid **eqFuncOids, FmgrInfo **hashFunctions)
Definition: execGrouping.c:96
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
void ResetTupleHashTable(TupleHashTable hashtable)
Definition: execGrouping.c:282
ExprContext * tmpcontext
Definition: execnodes.h:2058
FmgrInfo transfn
Definition: nodeAgg.h:80
#define HASHAGG_PARTITION_FACTOR
Definition: nodeAgg.c:284
static void build_hash_table(AggState *aggstate, int setno, long nbuckets)
Definition: nodeAgg.c:1466
static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
Definition: nodeAgg.c:1243
bool hash_spill_mode
Definition: execnodes.h:2091
#define FUNC_MAX_ARGS
static void hashagg_tapeinfo_init(AggState *aggstate)
Definition: nodeAgg.c:2827
List * hash_batches
Definition: execnodes.h:2089
Aggref * aggref
Definition: nodeAgg.h:43
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:806
#define linitial_int(l)
Definition: pg_list.h:196
Bitmapset ** grouped_cols
Definition: nodeAgg.h:276
PlanState ps
Definition: execnodes.h:1332
LogicalTapeSet * tapeset
Definition: nodeAgg.c:353
int maxsets
Definition: execnodes.h:2072
#define true
Definition: c.h:321
static bool agg_refill_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2536
#define HASHAGG_MIN_BUCKETS
Definition: nodeAgg.c:298
TupleTableSlot * hash_spill_slot
Definition: execnodes.h:2088
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define DO_AGGSPLIT_SERIALIZE(as)
Definition: nodes.h:790
#define HASHAGG_MIN_PARTITIONS
Definition: nodeAgg.c:285
void pfree(void *pointer)
Definition: mcxt.c:1056
MemoryContext es_query_cxt
Definition: execnodes.h:555
AggStrategy aggstrategy
Definition: plannodes.h:809
AggState * ExecInitAgg(Agg *node, EState *estate, int eflags)
Definition: nodeAgg.c:3139
#define linitial(l)
Definition: pg_list.h:195
bool table_filled
Definition: execnodes.h:2082
AggStrategy aggstrategy
Definition: execnodes.h:2049
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:688
static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate)
Definition: nodeAgg.c:2682
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:61
#define lfirst_int(lc)
Definition: pg_list.h:191
static void * list_nth(const List *list, int n)
Definition: pg_list.h:277
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1471
MemoryContext hash_metacxt
Definition: execnodes.h:2084
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
static TupleTableSlot * agg_retrieve_direct(AggState *aggstate)
Definition: nodeAgg.c:2136
#define AGG_CONTEXT_AGGREGATE
Definition: fmgr.h:731
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:600
struct TupleHashEntryData TupleHashEntryData
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
static void find_hash_columns(AggState *aggstate)
Definition: nodeAgg.c:1527
ExprState * equalfnMulti
Definition: nodeAgg.h:110
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
Tuplesortstate * sort_in
Definition: execnodes.h:2074
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1039
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2159
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:646
static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup)
Definition: nodeAgg.c:1288
bool AggStateIsShared(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4765
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, int ndest)
Definition: nodeAgg.c:2848
static int hash_choose_num_partitions(uint64 input_groups, double hashentrysize, int used_bits, int *log2_npartittions)
Definition: nodeAgg.c:1918
Aggref * aggref
Definition: execnodes.h:752
#define list_nth_node(type, list, n)
Definition: pg_list.h:305
Tuplesortstate ** sortstates
Definition: nodeAgg.h:153
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:167
Bitmapset * aggParams
Definition: plannodes.h:817
static int initValue(long lng_val)
Definition: informix.c:677
MemoryContext tablecxt
Definition: execnodes.h:704
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:499
bool * tts_isnull
Definition: tuptable.h:128
int npartitions
Definition: nodeAgg.c:333
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:290
Size hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace)
Definition: nodeAgg.c:1642
void hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions)
Definition: nodeAgg.c:1721
MinimalTupleData * MinimalTuple
Definition: htup.h:27
static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:941
List * aggorder
Definition: primnodes.h:320
int errcode_for_file_access(void)
Definition: elog.c:633
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:135
AttrNumber resno
Definition: primnodes.h:1408
#define DatumGetBool(X)
Definition: postgres.h:393
TupleTableSlot * ecxt_innertuple
Definition: execnodes.h:228
List * ExecInitExprList(List *nodes, PlanState *parent)
Definition: execExpr.c:318
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
Index agglevelsup
Definition: primnodes.h:327
int used_bits
Definition: nodeAgg.c:352
#define TupIsNull(slot)
Definition: tuptable.h:292
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
unsigned int uint32
Definition: c.h:367
List * aggdirectargs
Definition: primnodes.h:318
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Definition: nodeAgg.c:4303
AggStatePerAgg curperagg
Definition: execnodes.h:2061
AttrNumber * sortColIdx
Definition: nodeAgg.h:99
struct AggStatePerGroupData AggStatePerGroupData
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
Definition: nodeAgg.c:1392
struct HashTapeInfo * hash_tapeinfo
Definition: execnodes.h:2085
AggStatePerHash perhash
Definition: execnodes.h:2104
bool outeropsset
Definition: execnodes.h:1026
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void initialize_aggregates(AggState *aggstate, AggStatePerGroup *pergroups, int numReset)
Definition: nodeAgg.c:664
AggStrategy aggstrategy
Definition: nodeAgg.h:273
ExprState * evaltrans_cache[2][2]
Definition: nodeAgg.h:287
#define InstrCountFiltered1(node, delta)
Definition: execnodes.h:1047
#define EXEC_FLAG_REWIND
Definition: executor.h:57
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2702
void build_aggregate_combinefn_expr(Oid agg_state_type, Oid agg_input_collation, Oid combinefn_oid, Expr **combinefnexpr)
Definition: parse_agg.c:1961
Datum value
Definition: postgres.h:378
Bitmapset * grouped_cols
Definition: execnodes.h:2069
static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
Definition: nodeAgg.c:2917
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:131
TupleTableSlot * ExecAllocTableSlot(List **tupleTable, TupleDesc desc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1141
int hash_batches_used
Definition: execnodes.h:2102
MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo)
Definition: nodeAgg.c:4739
List * lappend_int(List *list, int datum)
Definition: list.c:340
Bitmapset * chgParam
Definition: execnodes.h:977
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:191
int ntapes
Definition: nodeAgg.c:315
bool IsBinaryCoercible(Oid srctype, Oid targettype)
int my_log2(long num)
Definition: dynahash.c:1719
#define outerPlan(node)
Definition: plannodes.h:172
List * lappend(List *list, void *datum)
Definition: list.c:322
TupleHashIterator hashiter
Definition: nodeAgg.h:300
int numCols
Definition: plannodes.h:770
Index varno
Definition: primnodes.h:184
static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:577
int num_hashes
Definition: execnodes.h:2083
Plan plan
Definition: plannodes.h:808
AttrNumber * hashGrpColIdxHash
Definition: nodeAgg.h:308
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
bool input_done
Definition: execnodes.h:2064
#define SizeofMinimalTupleHeader
Definition: htup_details.h:649
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
ExprContext * curaggcontext
Definition: execnodes.h:2060
ExprContext * hashcontext
Definition: execnodes.h:2056
bool * ecxt_aggnulls
Definition: execnodes.h:247
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
Definition: executor.h:397
#define TextDatumGetCString(d)
Definition: builtins.h:88
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos)
Definition: nodeAgg.c:4331
List * es_tupleTable
Definition: execnodes.h:557
#define HASHAGG_READ_BUFFER_SIZE
Definition: nodeAgg.c:294
AggStatePerPhase phase
Definition: execnodes.h:2051
void * palloc0(Size size)
Definition: mcxt.c:980
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:951
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
struct FunctionCallInfoBaseData * FunctionCallInfo
Definition: fmgr.h:38
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:240
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
FmgrInfo deserialfn
Definition: nodeAgg.h:86
int work_mem
Definition: globals.c:121
List * groupingSets
Definition: plannodes.h:819
int16 resulttypeLen
Definition: nodeAgg.h:215
static void initialize_phase(AggState *aggstate, int newphase)
Definition: nodeAgg.c:476
LogicalTapeSet * tapeset
Definition: nodeAgg.c:314
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:133
Plan * plan
Definition: execnodes.h:945
#define InvalidOid
Definition: postgres_ext.h:36
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1153
Oid aggfnoid
Definition: primnodes.h:312
int16 attnum
Definition: pg_attribute.h:79
#define ResetTupleHashIterator(htable, iter)
Definition: execnodes.h:727
#define ereport(elevel,...)
Definition: elog.h:144
static HeapTuple ExecCopySlotHeapTuple(TupleTableSlot *slot)
Definition: tuptable.h:452
static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:705
#define LOCAL_FCINFO(name, nargs)
Definition: fmgr.h:110
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
Definition: nodeAgg.c:1671
List * lcons(void *datum, List *list)
Definition: list.c:454
uint64 hash_disk_used
Definition: execnodes.h:2101
Size MemoryContextMemAllocated(MemoryContext context, bool recurse)
Definition: mcxt.c:470
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
#define Max(x, y)
Definition: c.h:914
ExprContext ** aggcontexts
Definition: execnodes.h:2057
#define makeNode(_type_)
Definition: nodes.h:574
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:230
int plan_width
Definition: plannodes.h:130
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
FmgrInfo * hashfunctions
Definition: nodeAgg.h:302
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
void RegisterExprContextCallback(ExprContext *econtext, ExprContextCallbackFunction function, Datum arg)
Definition: execUtils.c:863
FmgrInfo serialfn
Definition: nodeAgg.h:83
ExprState * execTuplesMatchPrepare(TupleDesc desc, int numCols, const AttrNumber *keyColIdx, const Oid *eqOperators, const Oid *collations, PlanState *parent)
Definition: execGrouping.c:59
int input_tapenum
Definition: nodeAgg.c:354
FunctionCallInfo deserialfn_fcinfo
Definition: nodeAgg.h:166
#define EXEC_FLAG_MARK
Definition: executor.h:59
AggSplit aggsplit
Definition: plannodes.h:810
struct AggStatePerAggData * AggStatePerAgg
Definition: execnodes.h:2037
void ExecReScanAgg(AggState *node)
Definition: nodeAgg.c:4514
void build_aggregate_serialfn_expr(Oid serialfn_oid, Expr **serialfnexpr)
Definition: parse_agg.c:1990
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:109
Expr * expr
Definition: primnodes.h:1407
AggSplit aggsplit
Definition: primnodes.h:328
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:691
void(* ExprContextCallbackFunction)(Datum arg)
Definition: execnodes.h:188
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1900
bool hash_ever_spilled
Definition: execnodes.h:2090
AggStatePerGroup * pergroups
Definition: execnodes.h:2078
size_t Size
Definition: c.h:466
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:444
#define AGG_CONTEXT_WINDOW
Definition: fmgr.h:732
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
FunctionCallInfo serialfn_fcinfo
Definition: nodeAgg.h:164
bool expression_tree_walker(Node *node, bool(*walker)(), void *context)
Definition: nodeFuncs.c:1839
static int list_length(const List *l)
Definition: pg_list.h:169
long numGroups
Definition: plannodes.h:815
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:719
#define DO_AGGSPLIT_SKIPFINAL(as)
Definition: nodes.h:789
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2090
Expr * aggfilter
Definition: primnodes.h:322
int AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
Definition: nodeAgg.c:4661
TupleDesc ExecTypeFromTL(List *targetList)
Definition: execTuples.c:1908
#define MAXALIGN(LEN)
Definition: c.h:691
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1769
void ReScanExprContext(ExprContext *econtext)
Definition: execUtils.c:402
static Bitmapset * find_unaggregated_cols(AggState *aggstate)
Definition: nodeAgg.c:1378
static TupleTableSlot * agg_retrieve_hash_table_in_memory(AggState *aggstate)
Definition: nodeAgg.c:2707
LogicalTapeSet * tapeset
Definition: nodeAgg.c:332
bool outeropsfixed
Definition: execnodes.h:1022
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:736
#define DO_AGGSPLIT_DESERIALIZE(as)
Definition: nodes.h:791
Size hash_mem_limit
Definition: execnodes.h:2093
struct Plan * lefttree
Definition: plannodes.h:144
TupleTableSlot * uniqslot
Definition: nodeAgg.h:136
int numphases
Definition: execnodes.h:2052
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:454
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:776
List * targetlist
Definition: plannodes.h:142
ExprState * qual
Definition: execnodes.h:966
#define DatumGetPointer(X)
Definition: postgres.h:549
AttrNumber * sortColIdx
Definition: plannodes.h:771
#define HASHAGG_WRITE_BUFFER_SIZE
Definition: nodeAgg.c:295
bool bms_overlap(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:494
uint32 TupleHashTableHash(TupleHashTable hashtable, TupleTableSlot *slot)
Definition: execGrouping.c:327
void AggRegisterCallback(FunctionCallInfo fcinfo, ExprContextCallbackFunction func, Datum arg)
Definition: nodeAgg.c:4804
Size hash_mem_peak
Definition: execnodes.h:2098
Oid * grpOperators
Definition: plannodes.h:813
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
Oid aggcollid
Definition: primnodes.h:314
List * chain
Definition: plannodes.h:820
AggStatePerAgg peragg
Definition: execnodes.h:2054
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
Definition: nodeAgg.c:2974
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
#define ACL_EXECUTE
Definition: parsenodes.h:81
void list_free(List *list)
Definition: list.c:1377
#define elog(elevel,...)
Definition: elog.h:214
AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4653
int i
List * aggdirectargs
Definition: nodeAgg.h:209
Oid aggtranstype
Definition: primnodes.h:316
uint64 hash_ngroups_current
Definition: execnodes.h:2099
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:651
void * arg
AggStatePerTrans curpertrans
Definition: execnodes.h:2063
Oid aggtype
Definition: primnodes.h:313
static void process_ordered_aggregate_single(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate)
Definition: nodeAgg.c:849
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
bool resulttypeByVal
Definition: nodeAgg.h:216
char aggkind
Definition: primnodes.h:326
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
int * partitions
Definition: nodeAgg.c:334
Definition: plannodes.h:806
List * aggs
Definition: execnodes.h:2046
TupleTableSlot * sortslot
Definition: nodeAgg.h:135
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
Definition: execUtils.c:646
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1172
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:1236
TupleTableSlot * hashslot
Definition: nodeAgg.h:301
Oid * collations
Definition: plannodes.h:773