PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
nodeWindowAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  * routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set. Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications. The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore. The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  * src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35 
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
53 #include "windowapi.h"
54 
55 /*
56  * All the window function APIs are called with this object, which is passed
57  * to window functions as fcinfo->context.
58  */
59 typedef struct WindowObjectData
60 {
62  WindowAggState *winstate; /* parent WindowAggState */
63  List *argstates; /* ExprState trees for fn's arguments */
64  void *localmem; /* WinGetPartitionLocalMemory's chunk */
65  int markptr; /* tuplestore mark pointer for this fn */
66  int readptr; /* tuplestore read pointer for this fn */
67  int64 markpos; /* row that markptr is positioned on */
68  int64 seekpos; /* row that readptr is positioned on */
70 
71 /*
72  * We have one WindowStatePerFunc struct for each window function and
73  * window aggregate handled by this node.
74  */
75 typedef struct WindowStatePerFuncData
76 {
77  /* Links to WindowFunc expr and state nodes this working state is for */
80 
81  int numArguments; /* number of arguments */
82 
83  FmgrInfo flinfo; /* fmgr lookup data for window function */
84 
85  Oid winCollation; /* collation derived for window function */
86 
87  /*
88  * We need the len and byval info for the result of each function in order
89  * to know how to copy/delete values.
90  */
93 
94  bool plain_agg; /* is it just a plain aggregate function? */
95  int aggno; /* if so, index of its PerAggData */
96 
97  WindowObject winobj; /* object used in window function API */
99 
100 /*
101  * For plain aggregate window functions, we also have one of these.
102  */
103 typedef struct WindowStatePerAggData
104 {
105  /* Oids of transition functions */
107  Oid invtransfn_oid; /* may be InvalidOid */
108  Oid finalfn_oid; /* may be InvalidOid */
109 
110  /*
111  * fmgr lookup data for transition functions --- only valid when
112  * corresponding oid is not InvalidOid. Note in particular that fn_strict
113  * flags are kept here.
114  */
118 
119  int numFinalArgs; /* number of arguments to pass to finalfn */
120 
121  /*
122  * initial value from pg_aggregate entry
123  */
126 
127  /*
128  * cached value for current frame boundaries
129  */
132 
133  /*
134  * We need the len and byval info for the agg's input, result, and
135  * transition data types in order to know how to copy/delete values.
136  */
139  transtypeLen;
143 
144  int wfuncno; /* index of associated PerFuncData */
145 
146  /* Context holding transition value and possibly other subsidiary data */
147  MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
148 
149  /* Current transition value */
150  Datum transValue; /* current transition value */
152 
153  int64 transValueCount; /* number of currently-aggregated rows */
154 
155  /* Data local to eval_windowaggregates() */
156  bool restart; /* need to restart this agg in this cycle? */
158 
159 static void initialize_windowaggregate(WindowAggState *winstate,
160  WindowStatePerFunc perfuncstate,
161  WindowStatePerAgg peraggstate);
162 static void advance_windowaggregate(WindowAggState *winstate,
163  WindowStatePerFunc perfuncstate,
164  WindowStatePerAgg peraggstate);
165 static bool advance_windowaggregate_base(WindowAggState *winstate,
166  WindowStatePerFunc perfuncstate,
167  WindowStatePerAgg peraggstate);
168 static void finalize_windowaggregate(WindowAggState *winstate,
169  WindowStatePerFunc perfuncstate,
170  WindowStatePerAgg peraggstate,
171  Datum *result, bool *isnull);
172 
173 static void eval_windowaggregates(WindowAggState *winstate);
174 static void eval_windowfunction(WindowAggState *winstate,
175  WindowStatePerFunc perfuncstate,
176  Datum *result, bool *isnull);
177 
178 static void begin_partition(WindowAggState *winstate);
179 static void spool_tuples(WindowAggState *winstate, int64 pos);
180 static void release_partition(WindowAggState *winstate);
181 
182 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
183  TupleTableSlot *slot);
184 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
185 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
186 
188  WindowFunc *wfunc,
189  WindowStatePerAgg peraggstate);
190 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
191 
192 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
193  TupleTableSlot *slot2);
194 static bool window_gettupleslot(WindowObject winobj, int64 pos,
195  TupleTableSlot *slot);
196 
197 
198 /*
199  * initialize_windowaggregate
200  * parallel to initialize_aggregates in nodeAgg.c
201  */
202 static void
204  WindowStatePerFunc perfuncstate,
205  WindowStatePerAgg peraggstate)
206 {
207  MemoryContext oldContext;
208 
209  /*
210  * If we're using a private aggcontext, we may reset it here. But if the
211  * context is shared, we don't know which other aggregates may still need
212  * it, so we must leave it to the caller to reset at an appropriate time.
213  */
214  if (peraggstate->aggcontext != winstate->aggcontext)
216 
217  if (peraggstate->initValueIsNull)
218  peraggstate->transValue = peraggstate->initValue;
219  else
220  {
221  oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
222  peraggstate->transValue = datumCopy(peraggstate->initValue,
223  peraggstate->transtypeByVal,
224  peraggstate->transtypeLen);
225  MemoryContextSwitchTo(oldContext);
226  }
227  peraggstate->transValueIsNull = peraggstate->initValueIsNull;
228  peraggstate->transValueCount = 0;
229  peraggstate->resultValue = (Datum) 0;
230  peraggstate->resultValueIsNull = true;
231 }
232 
233 /*
234  * advance_windowaggregate
235  * parallel to advance_aggregates in nodeAgg.c
236  */
237 static void
239  WindowStatePerFunc perfuncstate,
240  WindowStatePerAgg peraggstate)
241 {
242  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
243  int numArguments = perfuncstate->numArguments;
244  FunctionCallInfoData fcinfodata;
245  FunctionCallInfo fcinfo = &fcinfodata;
246  Datum newVal;
247  ListCell *arg;
248  int i;
249  MemoryContext oldContext;
250  ExprContext *econtext = winstate->tmpcontext;
251  ExprState *filter = wfuncstate->aggfilter;
252 
253  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
254 
255  /* Skip anything FILTERed out */
256  if (filter)
257  {
258  bool isnull;
259  Datum res = ExecEvalExpr(filter, econtext, &isnull);
260 
261  if (isnull || !DatumGetBool(res))
262  {
263  MemoryContextSwitchTo(oldContext);
264  return;
265  }
266  }
267 
268  /* We start from 1, since the 0th arg will be the transition value */
269  i = 1;
270  foreach(arg, wfuncstate->args)
271  {
272  ExprState *argstate = (ExprState *) lfirst(arg);
273 
274  fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
275  &fcinfo->argnull[i]);
276  i++;
277  }
278 
279  if (peraggstate->transfn.fn_strict)
280  {
281  /*
282  * For a strict transfn, nothing happens when there's a NULL input; we
283  * just keep the prior transValue. Note transValueCount doesn't
284  * change either.
285  */
286  for (i = 1; i <= numArguments; i++)
287  {
288  if (fcinfo->argnull[i])
289  {
290  MemoryContextSwitchTo(oldContext);
291  return;
292  }
293  }
294 
295  /*
296  * For strict transition functions with initial value NULL we use the
297  * first non-NULL input as the initial state. (We already checked
298  * that the agg's input type is binary-compatible with its transtype,
299  * so straight copy here is OK.)
300  *
301  * We must copy the datum into aggcontext if it is pass-by-ref. We do
302  * not need to pfree the old transValue, since it's NULL.
303  */
304  if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
305  {
306  MemoryContextSwitchTo(peraggstate->aggcontext);
307  peraggstate->transValue = datumCopy(fcinfo->arg[1],
308  peraggstate->transtypeByVal,
309  peraggstate->transtypeLen);
310  peraggstate->transValueIsNull = false;
311  peraggstate->transValueCount = 1;
312  MemoryContextSwitchTo(oldContext);
313  return;
314  }
315 
316  if (peraggstate->transValueIsNull)
317  {
318  /*
319  * Don't call a strict function with NULL inputs. Note it is
320  * possible to get here despite the above tests, if the transfn is
321  * strict *and* returned a NULL on a prior cycle. If that happens
322  * we will propagate the NULL all the way to the end. That can
323  * only happen if there's no inverse transition function, though,
324  * since we disallow transitions back to NULL when there is one.
325  */
326  MemoryContextSwitchTo(oldContext);
327  Assert(!OidIsValid(peraggstate->invtransfn_oid));
328  return;
329  }
330  }
331 
332  /*
333  * OK to call the transition function. Set winstate->curaggcontext while
334  * calling it, for possible use by AggCheckCallContext.
335  */
336  InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
337  numArguments + 1,
338  perfuncstate->winCollation,
339  (void *) winstate, NULL);
340  fcinfo->arg[0] = peraggstate->transValue;
341  fcinfo->argnull[0] = peraggstate->transValueIsNull;
342  winstate->curaggcontext = peraggstate->aggcontext;
343  newVal = FunctionCallInvoke(fcinfo);
344  winstate->curaggcontext = NULL;
345 
346  /*
347  * Moving-aggregate transition functions must not return null, see
348  * advance_windowaggregate_base().
349  */
350  if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
351  ereport(ERROR,
352  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
353  errmsg("moving-aggregate transition function must not return null")));
354 
355  /*
356  * We must track the number of rows included in transValue, since to
357  * remove the last input, advance_windowaggregate_base() mustn't call the
358  * inverse transition function, but simply reset transValue back to its
359  * initial value.
360  */
361  peraggstate->transValueCount++;
362 
363  /*
364  * If pass-by-ref datatype, must copy the new value into aggcontext and
365  * free the prior transValue. But if transfn returned a pointer to its
366  * first input, we don't need to do anything. Also, if transfn returned a
367  * pointer to a R/W expanded object that is already a child of the
368  * aggcontext, assume we can adopt that value without copying it.
369  */
370  if (!peraggstate->transtypeByVal &&
371  DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
372  {
373  if (!fcinfo->isnull)
374  {
375  MemoryContextSwitchTo(peraggstate->aggcontext);
377  false,
378  peraggstate->transtypeLen) &&
380  /* do nothing */ ;
381  else
382  newVal = datumCopy(newVal,
383  peraggstate->transtypeByVal,
384  peraggstate->transtypeLen);
385  }
386  if (!peraggstate->transValueIsNull)
387  {
389  false,
390  peraggstate->transtypeLen))
391  DeleteExpandedObject(peraggstate->transValue);
392  else
393  pfree(DatumGetPointer(peraggstate->transValue));
394  }
395  }
396 
397  MemoryContextSwitchTo(oldContext);
398  peraggstate->transValue = newVal;
399  peraggstate->transValueIsNull = fcinfo->isnull;
400 }
401 
402 /*
403  * advance_windowaggregate_base
404  * Remove the oldest tuple from an aggregation.
405  *
406  * This is very much like advance_windowaggregate, except that we will call
407  * the inverse transition function (which caller must have checked is
408  * available).
409  *
410  * Returns true if we successfully removed the current row from this
411  * aggregate, false if not (in the latter case, caller is responsible
412  * for cleaning up by restarting the aggregation).
413  */
414 static bool
416  WindowStatePerFunc perfuncstate,
417  WindowStatePerAgg peraggstate)
418 {
419  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
420  int numArguments = perfuncstate->numArguments;
421  FunctionCallInfoData fcinfodata;
422  FunctionCallInfo fcinfo = &fcinfodata;
423  Datum newVal;
424  ListCell *arg;
425  int i;
426  MemoryContext oldContext;
427  ExprContext *econtext = winstate->tmpcontext;
428  ExprState *filter = wfuncstate->aggfilter;
429 
430  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
431 
432  /* Skip anything FILTERed out */
433  if (filter)
434  {
435  bool isnull;
436  Datum res = ExecEvalExpr(filter, econtext, &isnull);
437 
438  if (isnull || !DatumGetBool(res))
439  {
440  MemoryContextSwitchTo(oldContext);
441  return true;
442  }
443  }
444 
445  /* We start from 1, since the 0th arg will be the transition value */
446  i = 1;
447  foreach(arg, wfuncstate->args)
448  {
449  ExprState *argstate = (ExprState *) lfirst(arg);
450 
451  fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
452  &fcinfo->argnull[i]);
453  i++;
454  }
455 
456  if (peraggstate->invtransfn.fn_strict)
457  {
458  /*
459  * For a strict (inv)transfn, nothing happens when there's a NULL
460  * input; we just keep the prior transValue. Note transValueCount
461  * doesn't change either.
462  */
463  for (i = 1; i <= numArguments; i++)
464  {
465  if (fcinfo->argnull[i])
466  {
467  MemoryContextSwitchTo(oldContext);
468  return true;
469  }
470  }
471  }
472 
473  /* There should still be an added but not yet removed value */
474  Assert(peraggstate->transValueCount > 0);
475 
476  /*
477  * In moving-aggregate mode, the state must never be NULL, except possibly
478  * before any rows have been aggregated (which is surely not the case at
479  * this point). This restriction allows us to interpret a NULL result
480  * from the inverse function as meaning "sorry, can't do an inverse
481  * transition in this case". We already checked this in
482  * advance_windowaggregate, but just for safety, check again.
483  */
484  if (peraggstate->transValueIsNull)
485  elog(ERROR, "aggregate transition value is NULL before inverse transition");
486 
487  /*
488  * We mustn't use the inverse transition function to remove the last
489  * input. Doing so would yield a non-NULL state, whereas we should be in
490  * the initial state afterwards which may very well be NULL. So instead,
491  * we simply re-initialize the aggregate in this case.
492  */
493  if (peraggstate->transValueCount == 1)
494  {
495  MemoryContextSwitchTo(oldContext);
497  &winstate->perfunc[peraggstate->wfuncno],
498  peraggstate);
499  return true;
500  }
501 
502  /*
503  * OK to call the inverse transition function. Set
504  * winstate->curaggcontext while calling it, for possible use by
505  * AggCheckCallContext.
506  */
507  InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
508  numArguments + 1,
509  perfuncstate->winCollation,
510  (void *) winstate, NULL);
511  fcinfo->arg[0] = peraggstate->transValue;
512  fcinfo->argnull[0] = peraggstate->transValueIsNull;
513  winstate->curaggcontext = peraggstate->aggcontext;
514  newVal = FunctionCallInvoke(fcinfo);
515  winstate->curaggcontext = NULL;
516 
517  /*
518  * If the function returns NULL, report failure, forcing a restart.
519  */
520  if (fcinfo->isnull)
521  {
522  MemoryContextSwitchTo(oldContext);
523  return false;
524  }
525 
526  /* Update number of rows included in transValue */
527  peraggstate->transValueCount--;
528 
529  /*
530  * If pass-by-ref datatype, must copy the new value into aggcontext and
531  * free the prior transValue. But if invtransfn returned a pointer to its
532  * first input, we don't need to do anything. Also, if invtransfn
533  * returned a pointer to a R/W expanded object that is already a child of
534  * the aggcontext, assume we can adopt that value without copying it.
535  *
536  * Note: the checks for null values here will never fire, but it seems
537  * best to have this stanza look just like advance_windowaggregate.
538  */
539  if (!peraggstate->transtypeByVal &&
540  DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
541  {
542  if (!fcinfo->isnull)
543  {
544  MemoryContextSwitchTo(peraggstate->aggcontext);
546  false,
547  peraggstate->transtypeLen) &&
549  /* do nothing */ ;
550  else
551  newVal = datumCopy(newVal,
552  peraggstate->transtypeByVal,
553  peraggstate->transtypeLen);
554  }
555  if (!peraggstate->transValueIsNull)
556  {
558  false,
559  peraggstate->transtypeLen))
560  DeleteExpandedObject(peraggstate->transValue);
561  else
562  pfree(DatumGetPointer(peraggstate->transValue));
563  }
564  }
565 
566  MemoryContextSwitchTo(oldContext);
567  peraggstate->transValue = newVal;
568  peraggstate->transValueIsNull = fcinfo->isnull;
569 
570  return true;
571 }
572 
573 /*
574  * finalize_windowaggregate
575  * parallel to finalize_aggregate in nodeAgg.c
576  */
577 static void
579  WindowStatePerFunc perfuncstate,
580  WindowStatePerAgg peraggstate,
581  Datum *result, bool *isnull)
582 {
583  MemoryContext oldContext;
584 
586 
587  /*
588  * Apply the agg's finalfn if one is provided, else return transValue.
589  */
590  if (OidIsValid(peraggstate->finalfn_oid))
591  {
592  int numFinalArgs = peraggstate->numFinalArgs;
593  FunctionCallInfoData fcinfo;
594  bool anynull;
595  int i;
596 
597  InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
598  numFinalArgs,
599  perfuncstate->winCollation,
600  (void *) winstate, NULL);
601  fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
602  peraggstate->transValueIsNull,
603  peraggstate->transtypeLen);
604  fcinfo.argnull[0] = peraggstate->transValueIsNull;
605  anynull = peraggstate->transValueIsNull;
606 
607  /* Fill any remaining argument positions with nulls */
608  for (i = 1; i < numFinalArgs; i++)
609  {
610  fcinfo.arg[i] = (Datum) 0;
611  fcinfo.argnull[i] = true;
612  anynull = true;
613  }
614 
615  if (fcinfo.flinfo->fn_strict && anynull)
616  {
617  /* don't call a strict function with NULL inputs */
618  *result = (Datum) 0;
619  *isnull = true;
620  }
621  else
622  {
623  winstate->curaggcontext = peraggstate->aggcontext;
624  *result = FunctionCallInvoke(&fcinfo);
625  winstate->curaggcontext = NULL;
626  *isnull = fcinfo.isnull;
627  }
628  }
629  else
630  {
631  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
632  *result = peraggstate->transValue;
633  *isnull = peraggstate->transValueIsNull;
634  }
635 
636  /*
637  * If result is pass-by-ref, make sure it is in the right context.
638  */
639  if (!peraggstate->resulttypeByVal && !*isnull &&
641  DatumGetPointer(*result)))
642  *result = datumCopy(*result,
643  peraggstate->resulttypeByVal,
644  peraggstate->resulttypeLen);
645  MemoryContextSwitchTo(oldContext);
646 }
647 
648 /*
649  * eval_windowaggregates
650  * evaluate plain aggregates being used as window functions
651  *
652  * This differs from nodeAgg.c in two ways. First, if the window's frame
653  * start position moves, we use the inverse transition function (if it exists)
654  * to remove rows from the transition value. And second, we expect to be
655  * able to call aggregate final functions repeatedly after aggregating more
656  * data onto the same transition value. This is not a behavior required by
657  * nodeAgg.c.
658  */
659 static void
661 {
662  WindowStatePerAgg peraggstate;
663  int wfuncno,
664  numaggs,
665  numaggs_restart,
666  i;
667  int64 aggregatedupto_nonrestarted;
668  MemoryContext oldContext;
669  ExprContext *econtext;
670  WindowObject agg_winobj;
671  TupleTableSlot *agg_row_slot;
672  TupleTableSlot *temp_slot;
673 
674  numaggs = winstate->numaggs;
675  if (numaggs == 0)
676  return; /* nothing to do */
677 
678  /* final output execution is in ps_ExprContext */
679  econtext = winstate->ss.ps.ps_ExprContext;
680  agg_winobj = winstate->agg_winobj;
681  agg_row_slot = winstate->agg_row_slot;
682  temp_slot = winstate->temp_slot_1;
683 
684  /*
685  * Currently, we support only a subset of the SQL-standard window framing
686  * rules.
687  *
688  * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
689  * a contiguous group of rows extending forward from the start of the
690  * partition, and rows only enter the frame, never exit it, as the current
691  * row advances forward. This makes it possible to use an incremental
692  * strategy for evaluating aggregates: we run the transition function for
693  * each row added to the frame, and run the final function whenever we
694  * need the current aggregate value. This is considerably more efficient
695  * than the naive approach of re-running the entire aggregate calculation
696  * for each current row. It does assume that the final function doesn't
697  * damage the running transition value, but we have the same assumption in
698  * nodeAgg.c too (when it rescans an existing hash table).
699  *
700  * If the frame start does sometimes move, we can still optimize as above
701  * whenever successive rows share the same frame head, but if the frame
702  * head moves beyond the previous head we try to remove those rows using
703  * the aggregate's inverse transition function. This function restores
704  * the aggregate's current state to what it would be if the removed row
705  * had never been aggregated in the first place. Inverse transition
706  * functions may optionally return NULL, indicating that the function was
707  * unable to remove the tuple from aggregation. If this happens, or if
708  * the aggregate doesn't have an inverse transition function at all, we
709  * must perform the aggregation all over again for all tuples within the
710  * new frame boundaries.
711  *
712  * In many common cases, multiple rows share the same frame and hence the
713  * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
714  * window, then all rows are peers and so they all have window frame equal
715  * to the whole partition.) We optimize such cases by calculating the
716  * aggregate value once when we reach the first row of a peer group, and
717  * then returning the saved value for all subsequent rows.
718  *
719  * 'aggregatedupto' keeps track of the first row that has not yet been
720  * accumulated into the aggregate transition values. Whenever we start a
721  * new peer group, we accumulate forward to the end of the peer group.
722  */
723 
724  /*
725  * First, update the frame head position.
726  *
727  * The frame head should never move backwards, and the code below wouldn't
728  * cope if it did, so for safety we complain if it does.
729  */
730  update_frameheadpos(agg_winobj, temp_slot);
731  if (winstate->frameheadpos < winstate->aggregatedbase)
732  elog(ERROR, "window frame head moved backward");
733 
734  /*
735  * If the frame didn't change compared to the previous row, we can re-use
736  * the result values that were previously saved at the bottom of this
737  * function. Since we don't know the current frame's end yet, this is not
738  * possible to check for fully. But if the frame end mode is UNBOUNDED
739  * FOLLOWING or CURRENT ROW, and the current row lies within the previous
740  * row's frame, then the two frames' ends must coincide. Note that on the
741  * first row aggregatedbase == aggregatedupto, meaning this test must
742  * fail, so we don't need to check the "there was no previous row" case
743  * explicitly here.
744  */
745  if (winstate->aggregatedbase == winstate->frameheadpos &&
748  winstate->aggregatedbase <= winstate->currentpos &&
749  winstate->aggregatedupto > winstate->currentpos)
750  {
751  for (i = 0; i < numaggs; i++)
752  {
753  peraggstate = &winstate->peragg[i];
754  wfuncno = peraggstate->wfuncno;
755  econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
756  econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
757  }
758  return;
759  }
760 
761  /*----------
762  * Initialize restart flags.
763  *
764  * We restart the aggregation:
765  * - if we're processing the first row in the partition, or
766  * - if the frame's head moved and we cannot use an inverse
767  * transition function, or
768  * - if the new frame doesn't overlap the old one
769  *
770  * Note that we don't strictly need to restart in the last case, but if
771  * we're going to remove all rows from the aggregation anyway, a restart
772  * surely is faster.
773  *----------
774  */
775  numaggs_restart = 0;
776  for (i = 0; i < numaggs; i++)
777  {
778  peraggstate = &winstate->peragg[i];
779  if (winstate->currentpos == 0 ||
780  (winstate->aggregatedbase != winstate->frameheadpos &&
781  !OidIsValid(peraggstate->invtransfn_oid)) ||
782  winstate->aggregatedupto <= winstate->frameheadpos)
783  {
784  peraggstate->restart = true;
785  numaggs_restart++;
786  }
787  else
788  peraggstate->restart = false;
789  }
790 
791  /*
792  * If we have any possibly-moving aggregates, attempt to advance
793  * aggregatedbase to match the frame's head by removing input rows that
794  * fell off the top of the frame from the aggregations. This can fail,
795  * i.e. advance_windowaggregate_base() can return false, in which case
796  * we'll restart that aggregate below.
797  */
798  while (numaggs_restart < numaggs &&
799  winstate->aggregatedbase < winstate->frameheadpos)
800  {
801  /*
802  * Fetch the next tuple of those being removed. This should never fail
803  * as we should have been here before.
804  */
805  if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
806  temp_slot))
807  elog(ERROR, "could not re-fetch previously fetched frame row");
808 
809  /* Set tuple context for evaluation of aggregate arguments */
810  winstate->tmpcontext->ecxt_outertuple = temp_slot;
811 
812  /*
813  * Perform the inverse transition for each aggregate function in the
814  * window, unless it has already been marked as needing a restart.
815  */
816  for (i = 0; i < numaggs; i++)
817  {
818  bool ok;
819 
820  peraggstate = &winstate->peragg[i];
821  if (peraggstate->restart)
822  continue;
823 
824  wfuncno = peraggstate->wfuncno;
825  ok = advance_windowaggregate_base(winstate,
826  &winstate->perfunc[wfuncno],
827  peraggstate);
828  if (!ok)
829  {
830  /* Inverse transition function has failed, must restart */
831  peraggstate->restart = true;
832  numaggs_restart++;
833  }
834  }
835 
836  /* Reset per-input-tuple context after each tuple */
837  ResetExprContext(winstate->tmpcontext);
838 
839  /* And advance the aggregated-row state */
840  winstate->aggregatedbase++;
841  ExecClearTuple(temp_slot);
842  }
843 
844  /*
845  * If we successfully advanced the base rows of all the aggregates,
846  * aggregatedbase now equals frameheadpos; but if we failed for any, we
847  * must forcibly update aggregatedbase.
848  */
849  winstate->aggregatedbase = winstate->frameheadpos;
850 
851  /*
852  * If we created a mark pointer for aggregates, keep it pushed up to frame
853  * head, so that tuplestore can discard unnecessary rows.
854  */
855  if (agg_winobj->markptr >= 0)
856  WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
857 
858  /*
859  * Now restart the aggregates that require it.
860  *
861  * We assume that aggregates using the shared context always restart if
862  * *any* aggregate restarts, and we may thus clean up the shared
863  * aggcontext if that is the case. Private aggcontexts are reset by
864  * initialize_windowaggregate() if their owning aggregate restarts. If we
865  * aren't restarting an aggregate, we need to free any previously saved
866  * result for it, else we'll leak memory.
867  */
868  if (numaggs_restart > 0)
870  for (i = 0; i < numaggs; i++)
871  {
872  peraggstate = &winstate->peragg[i];
873 
874  /* Aggregates using the shared ctx must restart if *any* agg does */
875  Assert(peraggstate->aggcontext != winstate->aggcontext ||
876  numaggs_restart == 0 ||
877  peraggstate->restart);
878 
879  if (peraggstate->restart)
880  {
881  wfuncno = peraggstate->wfuncno;
883  &winstate->perfunc[wfuncno],
884  peraggstate);
885  }
886  else if (!peraggstate->resultValueIsNull)
887  {
888  if (!peraggstate->resulttypeByVal)
889  pfree(DatumGetPointer(peraggstate->resultValue));
890  peraggstate->resultValue = (Datum) 0;
891  peraggstate->resultValueIsNull = true;
892  }
893  }
894 
895  /*
896  * Non-restarted aggregates now contain the rows between aggregatedbase
897  * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
898  * contain no rows. If there are any restarted aggregates, we must thus
899  * begin aggregating anew at frameheadpos, otherwise we may simply
900  * continue at aggregatedupto. We must remember the old value of
901  * aggregatedupto to know how long to skip advancing non-restarted
902  * aggregates. If we modify aggregatedupto, we must also clear
903  * agg_row_slot, per the loop invariant below.
904  */
905  aggregatedupto_nonrestarted = winstate->aggregatedupto;
906  if (numaggs_restart > 0 &&
907  winstate->aggregatedupto != winstate->frameheadpos)
908  {
909  winstate->aggregatedupto = winstate->frameheadpos;
910  ExecClearTuple(agg_row_slot);
911  }
912 
913  /*
914  * Advance until we reach a row not in frame (or end of partition).
915  *
916  * Note the loop invariant: agg_row_slot is either empty or holds the row
917  * at position aggregatedupto. We advance aggregatedupto after processing
918  * a row.
919  */
920  for (;;)
921  {
922  /* Fetch next row if we didn't already */
923  if (TupIsNull(agg_row_slot))
924  {
925  if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
926  agg_row_slot))
927  break; /* must be end of partition */
928  }
929 
930  /* Exit loop (for now) if not in frame */
931  if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
932  break;
933 
934  /* Set tuple context for evaluation of aggregate arguments */
935  winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
936 
937  /* Accumulate row into the aggregates */
938  for (i = 0; i < numaggs; i++)
939  {
940  peraggstate = &winstate->peragg[i];
941 
942  /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
943  if (!peraggstate->restart &&
944  winstate->aggregatedupto < aggregatedupto_nonrestarted)
945  continue;
946 
947  wfuncno = peraggstate->wfuncno;
948  advance_windowaggregate(winstate,
949  &winstate->perfunc[wfuncno],
950  peraggstate);
951  }
952 
953  /* Reset per-input-tuple context after each tuple */
954  ResetExprContext(winstate->tmpcontext);
955 
956  /* And advance the aggregated-row state */
957  winstate->aggregatedupto++;
958  ExecClearTuple(agg_row_slot);
959  }
960 
961  /* The frame's end is not supposed to move backwards, ever */
962  Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
963 
964  /*
965  * finalize aggregates and fill result/isnull fields.
966  */
967  for (i = 0; i < numaggs; i++)
968  {
969  Datum *result;
970  bool *isnull;
971 
972  peraggstate = &winstate->peragg[i];
973  wfuncno = peraggstate->wfuncno;
974  result = &econtext->ecxt_aggvalues[wfuncno];
975  isnull = &econtext->ecxt_aggnulls[wfuncno];
976  finalize_windowaggregate(winstate,
977  &winstate->perfunc[wfuncno],
978  peraggstate,
979  result, isnull);
980 
981  /*
982  * save the result in case next row shares the same frame.
983  *
984  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
985  * advance that the next row can't possibly share the same frame. Is
986  * it worth detecting that and skipping this code?
987  */
988  if (!peraggstate->resulttypeByVal && !*isnull)
989  {
990  oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
991  peraggstate->resultValue =
992  datumCopy(*result,
993  peraggstate->resulttypeByVal,
994  peraggstate->resulttypeLen);
995  MemoryContextSwitchTo(oldContext);
996  }
997  else
998  {
999  peraggstate->resultValue = *result;
1000  }
1001  peraggstate->resultValueIsNull = *isnull;
1002  }
1003 }
1004 
1005 /*
1006  * eval_windowfunction
1007  *
1008  * Arguments of window functions are not evaluated here, because a window
1009  * function can need random access to arbitrary rows in the partition.
1010  * The window function uses the special WinGetFuncArgInPartition and
1011  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1012  * it wants.
1013  */
1014 static void
1016  Datum *result, bool *isnull)
1017 {
1018  FunctionCallInfoData fcinfo;
1019  MemoryContext oldContext;
1020 
1022 
1023  /*
1024  * We don't pass any normal arguments to a window function, but we do pass
1025  * it the number of arguments, in order to permit window function
1026  * implementations to support varying numbers of arguments. The real info
1027  * goes through the WindowObject, which is passed via fcinfo->context.
1028  */
1029  InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
1030  perfuncstate->numArguments,
1031  perfuncstate->winCollation,
1032  (void *) perfuncstate->winobj, NULL);
1033  /* Just in case, make all the regular argument slots be null */
1034  memset(fcinfo.argnull, true, perfuncstate->numArguments);
1035  /* Window functions don't have a current aggregate context, either */
1036  winstate->curaggcontext = NULL;
1037 
1038  *result = FunctionCallInvoke(&fcinfo);
1039  *isnull = fcinfo.isnull;
1040 
1041  /*
1042  * Make sure pass-by-ref data is allocated in the appropriate context. (We
1043  * need this in case the function returns a pointer into some short-lived
1044  * tuple, as is entirely possible.)
1045  */
1046  if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1048  DatumGetPointer(*result)))
1049  *result = datumCopy(*result,
1050  perfuncstate->resulttypeByVal,
1051  perfuncstate->resulttypeLen);
1052 
1053  MemoryContextSwitchTo(oldContext);
1054 }
1055 
1056 /*
1057  * begin_partition
1058  * Start buffering rows of the next partition.
1059  */
1060 static void
1062 {
1063  PlanState *outerPlan = outerPlanState(winstate);
1064  int numfuncs = winstate->numfuncs;
1065  int i;
1066 
1067  winstate->partition_spooled = false;
1068  winstate->framehead_valid = false;
1069  winstate->frametail_valid = false;
1070  winstate->spooled_rows = 0;
1071  winstate->currentpos = 0;
1072  winstate->frameheadpos = 0;
1073  winstate->frametailpos = -1;
1074  ExecClearTuple(winstate->agg_row_slot);
1075 
1076  /*
1077  * If this is the very first partition, we need to fetch the first input
1078  * row to store in first_part_slot.
1079  */
1080  if (TupIsNull(winstate->first_part_slot))
1081  {
1082  TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1083 
1084  if (!TupIsNull(outerslot))
1085  ExecCopySlot(winstate->first_part_slot, outerslot);
1086  else
1087  {
1088  /* outer plan is empty, so we have nothing to do */
1089  winstate->partition_spooled = true;
1090  winstate->more_partitions = false;
1091  return;
1092  }
1093  }
1094 
1095  /* Create new tuplestore for this partition */
1096  winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1097 
1098  /*
1099  * Set up read pointers for the tuplestore. The current pointer doesn't
1100  * need BACKWARD capability, but the per-window-function read pointers do,
1101  * and the aggregate pointer does if frame start is movable.
1102  */
1103  winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1104 
1105  /* reset default REWIND capability bit for current ptr */
1106  tuplestore_set_eflags(winstate->buffer, 0);
1107 
1108  /* create read pointers for aggregates, if needed */
1109  if (winstate->numaggs > 0)
1110  {
1111  WindowObject agg_winobj = winstate->agg_winobj;
1112  int readptr_flags = 0;
1113 
1114  /* If the frame head is potentially movable ... */
1116  {
1117  /* ... create a mark pointer to track the frame head */
1118  agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1119  /* and the read pointer will need BACKWARD capability */
1120  readptr_flags |= EXEC_FLAG_BACKWARD;
1121  }
1122 
1123  agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1124  readptr_flags);
1125  agg_winobj->markpos = -1;
1126  agg_winobj->seekpos = -1;
1127 
1128  /* Also reset the row counters for aggregates */
1129  winstate->aggregatedbase = 0;
1130  winstate->aggregatedupto = 0;
1131  }
1132 
1133  /* create mark and read pointers for each real window function */
1134  for (i = 0; i < numfuncs; i++)
1135  {
1136  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1137 
1138  if (!perfuncstate->plain_agg)
1139  {
1140  WindowObject winobj = perfuncstate->winobj;
1141 
1142  winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1143  0);
1144  winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1146  winobj->markpos = -1;
1147  winobj->seekpos = -1;
1148  }
1149  }
1150 
1151  /*
1152  * Store the first tuple into the tuplestore (it's always available now;
1153  * we either read it above, or saved it at the end of previous partition)
1154  */
1155  tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1156  winstate->spooled_rows++;
1157 }
1158 
1159 /*
1160  * Read tuples from the outer node, up to and including position 'pos', and
1161  * store them into the tuplestore. If pos is -1, reads the whole partition.
1162  */
1163 static void
1164 spool_tuples(WindowAggState *winstate, int64 pos)
1165 {
1166  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1168  TupleTableSlot *outerslot;
1169  MemoryContext oldcontext;
1170 
1171  if (!winstate->buffer)
1172  return; /* just a safety check */
1173  if (winstate->partition_spooled)
1174  return; /* whole partition done already */
1175 
1176  /*
1177  * If the tuplestore has spilled to disk, alternate reading and writing
1178  * becomes quite expensive due to frequent buffer flushes. It's cheaper
1179  * to force the entire partition to get spooled in one go.
1180  *
1181  * XXX this is a horrid kluge --- it'd be better to fix the performance
1182  * problem inside tuplestore. FIXME
1183  */
1184  if (!tuplestore_in_memory(winstate->buffer))
1185  pos = -1;
1186 
1187  outerPlan = outerPlanState(winstate);
1188 
1189  /* Must be in query context to call outerplan */
1191 
1192  while (winstate->spooled_rows <= pos || pos == -1)
1193  {
1194  outerslot = ExecProcNode(outerPlan);
1195  if (TupIsNull(outerslot))
1196  {
1197  /* reached the end of the last partition */
1198  winstate->partition_spooled = true;
1199  winstate->more_partitions = false;
1200  break;
1201  }
1202 
1203  if (node->partNumCols > 0)
1204  {
1205  /* Check if this tuple still belongs to the current partition */
1206  if (!execTuplesMatch(winstate->first_part_slot,
1207  outerslot,
1208  node->partNumCols, node->partColIdx,
1209  winstate->partEqfunctions,
1210  winstate->tmpcontext->ecxt_per_tuple_memory))
1211  {
1212  /*
1213  * end of partition; copy the tuple for the next cycle.
1214  */
1215  ExecCopySlot(winstate->first_part_slot, outerslot);
1216  winstate->partition_spooled = true;
1217  winstate->more_partitions = true;
1218  break;
1219  }
1220  }
1221 
1222  /* Still in partition, so save it into the tuplestore */
1223  tuplestore_puttupleslot(winstate->buffer, outerslot);
1224  winstate->spooled_rows++;
1225  }
1226 
1227  MemoryContextSwitchTo(oldcontext);
1228 }
1229 
1230 /*
1231  * release_partition
1232  * clear information kept within a partition, including
1233  * tuplestore and aggregate results.
1234  */
1235 static void
1237 {
1238  int i;
1239 
1240  for (i = 0; i < winstate->numfuncs; i++)
1241  {
1242  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1243 
1244  /* Release any partition-local state of this window function */
1245  if (perfuncstate->winobj)
1246  perfuncstate->winobj->localmem = NULL;
1247  }
1248 
1249  /*
1250  * Release all partition-local memory (in particular, any partition-local
1251  * state that we might have trashed our pointers to in the above loop, and
1252  * any aggregate temp data). We don't rely on retail pfree because some
1253  * aggregates might have allocated data we don't have direct pointers to.
1254  */
1257  for (i = 0; i < winstate->numaggs; i++)
1258  {
1259  if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1261  }
1262 
1263  if (winstate->buffer)
1264  tuplestore_end(winstate->buffer);
1265  winstate->buffer = NULL;
1266  winstate->partition_spooled = false;
1267 }
1268 
1269 /*
1270  * row_is_in_frame
1271  * Determine whether a row is in the current row's window frame according
1272  * to our window framing rule
1273  *
1274  * The caller must have already determined that the row is in the partition
1275  * and fetched it into a slot. This function just encapsulates the framing
1276  * rules.
1277  */
1278 static bool
1279 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1280 {
1281  int frameOptions = winstate->frameOptions;
1282 
1283  Assert(pos >= 0); /* else caller error */
1284 
1285  /* First, check frame starting conditions */
1286  if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1287  {
1288  if (frameOptions & FRAMEOPTION_ROWS)
1289  {
1290  /* rows before current row are out of frame */
1291  if (pos < winstate->currentpos)
1292  return false;
1293  }
1294  else if (frameOptions & FRAMEOPTION_RANGE)
1295  {
1296  /* preceding row that is not peer is out of frame */
1297  if (pos < winstate->currentpos &&
1298  !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1299  return false;
1300  }
1301  else
1302  Assert(false);
1303  }
1304  else if (frameOptions & FRAMEOPTION_START_VALUE)
1305  {
1306  if (frameOptions & FRAMEOPTION_ROWS)
1307  {
1308  int64 offset = DatumGetInt64(winstate->startOffsetValue);
1309 
1310  /* rows before current row + offset are out of frame */
1311  if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1312  offset = -offset;
1313 
1314  if (pos < winstate->currentpos + offset)
1315  return false;
1316  }
1317  else if (frameOptions & FRAMEOPTION_RANGE)
1318  {
1319  /* parser should have rejected this */
1320  elog(ERROR, "window frame with value offset is not implemented");
1321  }
1322  else
1323  Assert(false);
1324  }
1325 
1326  /* Okay so far, now check frame ending conditions */
1327  if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1328  {
1329  if (frameOptions & FRAMEOPTION_ROWS)
1330  {
1331  /* rows after current row are out of frame */
1332  if (pos > winstate->currentpos)
1333  return false;
1334  }
1335  else if (frameOptions & FRAMEOPTION_RANGE)
1336  {
1337  /* following row that is not peer is out of frame */
1338  if (pos > winstate->currentpos &&
1339  !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1340  return false;
1341  }
1342  else
1343  Assert(false);
1344  }
1345  else if (frameOptions & FRAMEOPTION_END_VALUE)
1346  {
1347  if (frameOptions & FRAMEOPTION_ROWS)
1348  {
1349  int64 offset = DatumGetInt64(winstate->endOffsetValue);
1350 
1351  /* rows after current row + offset are out of frame */
1352  if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1353  offset = -offset;
1354 
1355  if (pos > winstate->currentpos + offset)
1356  return false;
1357  }
1358  else if (frameOptions & FRAMEOPTION_RANGE)
1359  {
1360  /* parser should have rejected this */
1361  elog(ERROR, "window frame with value offset is not implemented");
1362  }
1363  else
1364  Assert(false);
1365  }
1366 
1367  /* If we get here, it's in frame */
1368  return true;
1369 }
1370 
1371 /*
1372  * update_frameheadpos
1373  * make frameheadpos valid for the current row
1374  *
1375  * Uses the winobj's read pointer for any required fetches; hence, if the
1376  * frame mode is one that requires row comparisons, the winobj's mark must
1377  * not be past the currently known frame head. Also uses the specified slot
1378  * for any required fetches.
1379  */
1380 static void
1382 {
1383  WindowAggState *winstate = winobj->winstate;
1384  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1385  int frameOptions = winstate->frameOptions;
1386 
1387  if (winstate->framehead_valid)
1388  return; /* already known for current row */
1389 
1390  if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1391  {
1392  /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1393  winstate->frameheadpos = 0;
1394  winstate->framehead_valid = true;
1395  }
1396  else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1397  {
1398  if (frameOptions & FRAMEOPTION_ROWS)
1399  {
1400  /* In ROWS mode, frame head is the same as current */
1401  winstate->frameheadpos = winstate->currentpos;
1402  winstate->framehead_valid = true;
1403  }
1404  else if (frameOptions & FRAMEOPTION_RANGE)
1405  {
1406  int64 fhprev;
1407 
1408  /* If no ORDER BY, all rows are peers with each other */
1409  if (node->ordNumCols == 0)
1410  {
1411  winstate->frameheadpos = 0;
1412  winstate->framehead_valid = true;
1413  return;
1414  }
1415 
1416  /*
1417  * In RANGE START_CURRENT mode, frame head is the first row that
1418  * is a peer of current row. We search backwards from current,
1419  * which could be a bit inefficient if peer sets are large. Might
1420  * be better to have a separate read pointer that moves forward
1421  * tracking the frame head.
1422  */
1423  fhprev = winstate->currentpos - 1;
1424  for (;;)
1425  {
1426  /* assume the frame head can't go backwards */
1427  if (fhprev < winstate->frameheadpos)
1428  break;
1429  if (!window_gettupleslot(winobj, fhprev, slot))
1430  break; /* start of partition */
1431  if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1432  break; /* not peer of current row */
1433  fhprev--;
1434  }
1435  winstate->frameheadpos = fhprev + 1;
1436  winstate->framehead_valid = true;
1437  }
1438  else
1439  Assert(false);
1440  }
1441  else if (frameOptions & FRAMEOPTION_START_VALUE)
1442  {
1443  if (frameOptions & FRAMEOPTION_ROWS)
1444  {
1445  /* In ROWS mode, bound is physically n before/after current */
1446  int64 offset = DatumGetInt64(winstate->startOffsetValue);
1447 
1448  if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1449  offset = -offset;
1450 
1451  winstate->frameheadpos = winstate->currentpos + offset;
1452  /* frame head can't go before first row */
1453  if (winstate->frameheadpos < 0)
1454  winstate->frameheadpos = 0;
1455  else if (winstate->frameheadpos > winstate->currentpos)
1456  {
1457  /* make sure frameheadpos is not past end of partition */
1458  spool_tuples(winstate, winstate->frameheadpos - 1);
1459  if (winstate->frameheadpos > winstate->spooled_rows)
1460  winstate->frameheadpos = winstate->spooled_rows;
1461  }
1462  winstate->framehead_valid = true;
1463  }
1464  else if (frameOptions & FRAMEOPTION_RANGE)
1465  {
1466  /* parser should have rejected this */
1467  elog(ERROR, "window frame with value offset is not implemented");
1468  }
1469  else
1470  Assert(false);
1471  }
1472  else
1473  Assert(false);
1474 }
1475 
1476 /*
1477  * update_frametailpos
1478  * make frametailpos valid for the current row
1479  *
1480  * Uses the winobj's read pointer for any required fetches; hence, if the
1481  * frame mode is one that requires row comparisons, the winobj's mark must
1482  * not be past the currently known frame tail. Also uses the specified slot
1483  * for any required fetches.
1484  */
1485 static void
1487 {
1488  WindowAggState *winstate = winobj->winstate;
1489  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1490  int frameOptions = winstate->frameOptions;
1491 
1492  if (winstate->frametail_valid)
1493  return; /* already known for current row */
1494 
1495  if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1496  {
1497  /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1498  spool_tuples(winstate, -1);
1499  winstate->frametailpos = winstate->spooled_rows - 1;
1500  winstate->frametail_valid = true;
1501  }
1502  else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1503  {
1504  if (frameOptions & FRAMEOPTION_ROWS)
1505  {
1506  /* In ROWS mode, exactly the rows up to current are in frame */
1507  winstate->frametailpos = winstate->currentpos;
1508  winstate->frametail_valid = true;
1509  }
1510  else if (frameOptions & FRAMEOPTION_RANGE)
1511  {
1512  int64 ftnext;
1513 
1514  /* If no ORDER BY, all rows are peers with each other */
1515  if (node->ordNumCols == 0)
1516  {
1517  spool_tuples(winstate, -1);
1518  winstate->frametailpos = winstate->spooled_rows - 1;
1519  winstate->frametail_valid = true;
1520  return;
1521  }
1522 
1523  /*
1524  * Else we have to search for the first non-peer of the current
1525  * row. We assume the current value of frametailpos is a lower
1526  * bound on the possible frame tail location, ie, frame tail never
1527  * goes backward, and that currentpos is also a lower bound, ie,
1528  * frame end always >= current row.
1529  */
1530  ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1531  for (;;)
1532  {
1533  if (!window_gettupleslot(winobj, ftnext, slot))
1534  break; /* end of partition */
1535  if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1536  break; /* not peer of current row */
1537  ftnext++;
1538  }
1539  winstate->frametailpos = ftnext - 1;
1540  winstate->frametail_valid = true;
1541  }
1542  else
1543  Assert(false);
1544  }
1545  else if (frameOptions & FRAMEOPTION_END_VALUE)
1546  {
1547  if (frameOptions & FRAMEOPTION_ROWS)
1548  {
1549  /* In ROWS mode, bound is physically n before/after current */
1550  int64 offset = DatumGetInt64(winstate->endOffsetValue);
1551 
1552  if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1553  offset = -offset;
1554 
1555  winstate->frametailpos = winstate->currentpos + offset;
1556  /* smallest allowable value of frametailpos is -1 */
1557  if (winstate->frametailpos < 0)
1558  winstate->frametailpos = -1;
1559  else if (winstate->frametailpos > winstate->currentpos)
1560  {
1561  /* make sure frametailpos is not past last row of partition */
1562  spool_tuples(winstate, winstate->frametailpos);
1563  if (winstate->frametailpos >= winstate->spooled_rows)
1564  winstate->frametailpos = winstate->spooled_rows - 1;
1565  }
1566  winstate->frametail_valid = true;
1567  }
1568  else if (frameOptions & FRAMEOPTION_RANGE)
1569  {
1570  /* parser should have rejected this */
1571  elog(ERROR, "window frame with value offset is not implemented");
1572  }
1573  else
1574  Assert(false);
1575  }
1576  else
1577  Assert(false);
1578 }
1579 
1580 
1581 /* -----------------
1582  * ExecWindowAgg
1583  *
1584  * ExecWindowAgg receives tuples from its outer subplan and
1585  * stores them into a tuplestore, then processes window functions.
1586  * This node doesn't reduce nor qualify any row so the number of
1587  * returned rows is exactly the same as its outer subplan's result.
1588  * -----------------
1589  */
1590 static TupleTableSlot *
1592 {
1593  WindowAggState *winstate = castNode(WindowAggState, pstate);
1594  ExprContext *econtext;
1595  int i;
1596  int numfuncs;
1597 
1599 
1600  if (winstate->all_done)
1601  return NULL;
1602 
1603  /*
1604  * Compute frame offset values, if any, during first call.
1605  */
1606  if (winstate->all_first)
1607  {
1608  int frameOptions = winstate->frameOptions;
1609  ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1610  Datum value;
1611  bool isnull;
1612  int16 len;
1613  bool byval;
1614 
1615  if (frameOptions & FRAMEOPTION_START_VALUE)
1616  {
1617  Assert(winstate->startOffset != NULL);
1618  value = ExecEvalExprSwitchContext(winstate->startOffset,
1619  econtext,
1620  &isnull);
1621  if (isnull)
1622  ereport(ERROR,
1623  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1624  errmsg("frame starting offset must not be null")));
1625  /* copy value into query-lifespan context */
1626  get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1627  &len, &byval);
1628  winstate->startOffsetValue = datumCopy(value, byval, len);
1629  if (frameOptions & FRAMEOPTION_ROWS)
1630  {
1631  /* value is known to be int8 */
1632  int64 offset = DatumGetInt64(value);
1633 
1634  if (offset < 0)
1635  ereport(ERROR,
1636  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1637  errmsg("frame starting offset must not be negative")));
1638  }
1639  }
1640  if (frameOptions & FRAMEOPTION_END_VALUE)
1641  {
1642  Assert(winstate->endOffset != NULL);
1643  value = ExecEvalExprSwitchContext(winstate->endOffset,
1644  econtext,
1645  &isnull);
1646  if (isnull)
1647  ereport(ERROR,
1648  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1649  errmsg("frame ending offset must not be null")));
1650  /* copy value into query-lifespan context */
1651  get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1652  &len, &byval);
1653  winstate->endOffsetValue = datumCopy(value, byval, len);
1654  if (frameOptions & FRAMEOPTION_ROWS)
1655  {
1656  /* value is known to be int8 */
1657  int64 offset = DatumGetInt64(value);
1658 
1659  if (offset < 0)
1660  ereport(ERROR,
1661  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1662  errmsg("frame ending offset must not be negative")));
1663  }
1664  }
1665  winstate->all_first = false;
1666  }
1667 
1668  if (winstate->buffer == NULL)
1669  {
1670  /* Initialize for first partition and set current row = 0 */
1671  begin_partition(winstate);
1672  /* If there are no input rows, we'll detect that and exit below */
1673  }
1674  else
1675  {
1676  /* Advance current row within partition */
1677  winstate->currentpos++;
1678  /* This might mean that the frame moves, too */
1679  winstate->framehead_valid = false;
1680  winstate->frametail_valid = false;
1681  }
1682 
1683  /*
1684  * Spool all tuples up to and including the current row, if we haven't
1685  * already
1686  */
1687  spool_tuples(winstate, winstate->currentpos);
1688 
1689  /* Move to the next partition if we reached the end of this partition */
1690  if (winstate->partition_spooled &&
1691  winstate->currentpos >= winstate->spooled_rows)
1692  {
1693  release_partition(winstate);
1694 
1695  if (winstate->more_partitions)
1696  {
1697  begin_partition(winstate);
1698  Assert(winstate->spooled_rows > 0);
1699  }
1700  else
1701  {
1702  winstate->all_done = true;
1703  return NULL;
1704  }
1705  }
1706 
1707  /* final output execution is in ps_ExprContext */
1708  econtext = winstate->ss.ps.ps_ExprContext;
1709 
1710  /* Clear the per-output-tuple context for current row */
1711  ResetExprContext(econtext);
1712 
1713  /*
1714  * Read the current row from the tuplestore, and save in ScanTupleSlot.
1715  * (We can't rely on the outerplan's output slot because we may have to
1716  * read beyond the current row. Also, we have to actually copy the row
1717  * out of the tuplestore, since window function evaluation might cause the
1718  * tuplestore to dump its state to disk.)
1719  *
1720  * Current row must be in the tuplestore, since we spooled it above.
1721  */
1722  tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1723  if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1724  winstate->ss.ss_ScanTupleSlot))
1725  elog(ERROR, "unexpected end of tuplestore");
1726 
1727  /*
1728  * Evaluate true window functions
1729  */
1730  numfuncs = winstate->numfuncs;
1731  for (i = 0; i < numfuncs; i++)
1732  {
1733  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1734 
1735  if (perfuncstate->plain_agg)
1736  continue;
1737  eval_windowfunction(winstate, perfuncstate,
1738  &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1739  &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1740  }
1741 
1742  /*
1743  * Evaluate aggregates
1744  */
1745  if (winstate->numaggs > 0)
1746  eval_windowaggregates(winstate);
1747 
1748  /*
1749  * Truncate any no-longer-needed rows from the tuplestore.
1750  */
1751  tuplestore_trim(winstate->buffer);
1752 
1753  /*
1754  * Form and return a projection tuple using the windowfunc results and the
1755  * current row. Setting ecxt_outertuple arranges that any Vars will be
1756  * evaluated with respect to that row.
1757  */
1758  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1759 
1760  return ExecProject(winstate->ss.ps.ps_ProjInfo);
1761 }
1762 
1763 /* -----------------
1764  * ExecInitWindowAgg
1765  *
1766  * Creates the run-time information for the WindowAgg node produced by the
1767  * planner and initializes its outer subtree
1768  * -----------------
1769  */
1771 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1772 {
1773  WindowAggState *winstate;
1774  Plan *outerPlan;
1775  ExprContext *econtext;
1776  ExprContext *tmpcontext;
1777  WindowStatePerFunc perfunc;
1778  WindowStatePerAgg peragg;
1779  int numfuncs,
1780  wfuncno,
1781  numaggs,
1782  aggno;
1783  ListCell *l;
1784 
1785  /* check for unsupported flags */
1786  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1787 
1788  /*
1789  * create state structure
1790  */
1791  winstate = makeNode(WindowAggState);
1792  winstate->ss.ps.plan = (Plan *) node;
1793  winstate->ss.ps.state = estate;
1794  winstate->ss.ps.ExecProcNode = ExecWindowAgg;
1795 
1796  /*
1797  * Create expression contexts. We need two, one for per-input-tuple
1798  * processing and one for per-output-tuple processing. We cheat a little
1799  * by using ExecAssignExprContext() to build both.
1800  */
1801  ExecAssignExprContext(estate, &winstate->ss.ps);
1802  tmpcontext = winstate->ss.ps.ps_ExprContext;
1803  winstate->tmpcontext = tmpcontext;
1804  ExecAssignExprContext(estate, &winstate->ss.ps);
1805 
1806  /* Create long-lived context for storage of partition-local memory etc */
1807  winstate->partcontext =
1809  "WindowAgg Partition",
1811 
1812  /*
1813  * Create mid-lived context for aggregate trans values etc.
1814  *
1815  * Note that moving aggregates each use their own private context, not
1816  * this one.
1817  */
1818  winstate->aggcontext =
1820  "WindowAgg Aggregates",
1822 
1823  /*
1824  * tuple table initialization
1825  */
1826  ExecInitScanTupleSlot(estate, &winstate->ss);
1827  ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1828  winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1829  winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1830  winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1831  winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1832 
1833  /*
1834  * WindowAgg nodes never have quals, since they can only occur at the
1835  * logical top level of a query (ie, after any WHERE or HAVING filters)
1836  */
1837  Assert(node->plan.qual == NIL);
1838  winstate->ss.ps.qual = NULL;
1839 
1840  /*
1841  * initialize child nodes
1842  */
1843  outerPlan = outerPlan(node);
1844  outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1845 
1846  /*
1847  * initialize source tuple type (which is also the tuple type that we'll
1848  * store in the tuplestore and use in all our working slots).
1849  */
1850  ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1851 
1860 
1861  /*
1862  * Initialize result tuple type and projection info.
1863  */
1864  ExecAssignResultTypeFromTL(&winstate->ss.ps);
1865  ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1866 
1867  /* Set up data for comparing tuples */
1868  if (node->partNumCols > 0)
1870  node->partOperators);
1871  if (node->ordNumCols > 0)
1873  node->ordOperators);
1874 
1875  /*
1876  * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1877  */
1878  numfuncs = winstate->numfuncs;
1879  numaggs = winstate->numaggs;
1880  econtext = winstate->ss.ps.ps_ExprContext;
1881  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1882  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1883 
1884  /*
1885  * allocate per-wfunc/per-agg state information.
1886  */
1887  perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1888  peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1889  winstate->perfunc = perfunc;
1890  winstate->peragg = peragg;
1891 
1892  wfuncno = -1;
1893  aggno = -1;
1894  foreach(l, winstate->funcs)
1895  {
1896  WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1897  WindowFunc *wfunc = wfuncstate->wfunc;
1898  WindowStatePerFunc perfuncstate;
1899  AclResult aclresult;
1900  int i;
1901 
1902  if (wfunc->winref != node->winref) /* planner screwed up? */
1903  elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1904  wfunc->winref, node->winref);
1905 
1906  /* Look for a previous duplicate window function */
1907  for (i = 0; i <= wfuncno; i++)
1908  {
1909  if (equal(wfunc, perfunc[i].wfunc) &&
1910  !contain_volatile_functions((Node *) wfunc))
1911  break;
1912  }
1913  if (i <= wfuncno)
1914  {
1915  /* Found a match to an existing entry, so just mark it */
1916  wfuncstate->wfuncno = i;
1917  continue;
1918  }
1919 
1920  /* Nope, so assign a new PerAgg record */
1921  perfuncstate = &perfunc[++wfuncno];
1922 
1923  /* Mark WindowFunc state node with assigned index in the result array */
1924  wfuncstate->wfuncno = wfuncno;
1925 
1926  /* Check permission to call window function */
1927  aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1928  ACL_EXECUTE);
1929  if (aclresult != ACLCHECK_OK)
1930  aclcheck_error(aclresult, ACL_KIND_PROC,
1931  get_func_name(wfunc->winfnoid));
1933 
1934  /* Fill in the perfuncstate data */
1935  perfuncstate->wfuncstate = wfuncstate;
1936  perfuncstate->wfunc = wfunc;
1937  perfuncstate->numArguments = list_length(wfuncstate->args);
1938 
1939  fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1940  econtext->ecxt_per_query_memory);
1941  fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1942 
1943  perfuncstate->winCollation = wfunc->inputcollid;
1944 
1945  get_typlenbyval(wfunc->wintype,
1946  &perfuncstate->resulttypeLen,
1947  &perfuncstate->resulttypeByVal);
1948 
1949  /*
1950  * If it's really just a plain aggregate function, we'll emulate the
1951  * Agg environment for it.
1952  */
1953  perfuncstate->plain_agg = wfunc->winagg;
1954  if (wfunc->winagg)
1955  {
1956  WindowStatePerAgg peraggstate;
1957 
1958  perfuncstate->aggno = ++aggno;
1959  peraggstate = &winstate->peragg[aggno];
1960  initialize_peragg(winstate, wfunc, peraggstate);
1961  peraggstate->wfuncno = wfuncno;
1962  }
1963  else
1964  {
1966 
1967  winobj->winstate = winstate;
1968  winobj->argstates = wfuncstate->args;
1969  winobj->localmem = NULL;
1970  perfuncstate->winobj = winobj;
1971  }
1972  }
1973 
1974  /* Update numfuncs, numaggs to match number of unique functions found */
1975  winstate->numfuncs = wfuncno + 1;
1976  winstate->numaggs = aggno + 1;
1977 
1978  /* Set up WindowObject for aggregates, if needed */
1979  if (winstate->numaggs > 0)
1980  {
1981  WindowObject agg_winobj = makeNode(WindowObjectData);
1982 
1983  agg_winobj->winstate = winstate;
1984  agg_winobj->argstates = NIL;
1985  agg_winobj->localmem = NULL;
1986  /* make sure markptr = -1 to invalidate. It may not get used */
1987  agg_winobj->markptr = -1;
1988  agg_winobj->readptr = -1;
1989  winstate->agg_winobj = agg_winobj;
1990  }
1991 
1992  /* copy frame options to state node for easy access */
1993  winstate->frameOptions = node->frameOptions;
1994 
1995  /* initialize frame bound offset expressions */
1996  winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
1997  (PlanState *) winstate);
1998  winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
1999  (PlanState *) winstate);
2000 
2001  winstate->all_first = true;
2002  winstate->partition_spooled = false;
2003  winstate->more_partitions = false;
2004 
2005  return winstate;
2006 }
2007 
2008 /* -----------------
2009  * ExecEndWindowAgg
2010  * -----------------
2011  */
2012 void
2014 {
2016  int i;
2017 
2018  release_partition(node);
2019 
2023  ExecClearTuple(node->temp_slot_1);
2024  ExecClearTuple(node->temp_slot_2);
2025 
2026  /*
2027  * Free both the expr contexts.
2028  */
2029  ExecFreeExprContext(&node->ss.ps);
2030  node->ss.ps.ps_ExprContext = node->tmpcontext;
2031  ExecFreeExprContext(&node->ss.ps);
2032 
2033  for (i = 0; i < node->numaggs; i++)
2034  {
2035  if (node->peragg[i].aggcontext != node->aggcontext)
2037  }
2040 
2041  pfree(node->perfunc);
2042  pfree(node->peragg);
2043 
2044  outerPlan = outerPlanState(node);
2045  ExecEndNode(outerPlan);
2046 }
2047 
2048 /* -----------------
2049  * ExecReScanWindowAgg
2050  * -----------------
2051  */
2052 void
2054 {
2056  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2057 
2058  node->all_done = false;
2059  node->all_first = true;
2060 
2061  /* release tuplestore et al */
2062  release_partition(node);
2063 
2064  /* release all temp tuples, but especially first_part_slot */
2068  ExecClearTuple(node->temp_slot_1);
2069  ExecClearTuple(node->temp_slot_2);
2070 
2071  /* Forget current wfunc values */
2072  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2073  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2074 
2075  /*
2076  * if chgParam of subnode is not null then plan will be re-scanned by
2077  * first ExecProcNode.
2078  */
2079  if (outerPlan->chgParam == NULL)
2080  ExecReScan(outerPlan);
2081 }
2082 
2083 /*
2084  * initialize_peragg
2085  *
2086  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2087  */
2088 static WindowStatePerAggData *
2090  WindowStatePerAgg peraggstate)
2091 {
2092  Oid inputTypes[FUNC_MAX_ARGS];
2093  int numArguments;
2094  HeapTuple aggTuple;
2095  Form_pg_aggregate aggform;
2096  Oid aggtranstype;
2097  AttrNumber initvalAttNo;
2098  AclResult aclresult;
2099  Oid transfn_oid,
2100  invtransfn_oid,
2101  finalfn_oid;
2102  bool finalextra;
2103  Expr *transfnexpr,
2104  *invtransfnexpr,
2105  *finalfnexpr;
2106  Datum textInitVal;
2107  int i;
2108  ListCell *lc;
2109 
2110  numArguments = list_length(wfunc->args);
2111 
2112  i = 0;
2113  foreach(lc, wfunc->args)
2114  {
2115  inputTypes[i++] = exprType((Node *) lfirst(lc));
2116  }
2117 
2118  aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2119  if (!HeapTupleIsValid(aggTuple))
2120  elog(ERROR, "cache lookup failed for aggregate %u",
2121  wfunc->winfnoid);
2122  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2123 
2124  /*
2125  * Figure out whether we want to use the moving-aggregate implementation,
2126  * and collect the right set of fields from the pg_attribute entry.
2127  *
2128  * If the frame head can't move, we don't need moving-aggregate code. Even
2129  * if we'd like to use it, don't do so if the aggregate's arguments (and
2130  * FILTER clause if any) contain any calls to volatile functions.
2131  * Otherwise, the difference between restarting and not restarting the
2132  * aggregation would be user-visible.
2133  */
2134  if (OidIsValid(aggform->aggminvtransfn) &&
2136  !contain_volatile_functions((Node *) wfunc))
2137  {
2138  peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2139  peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2140  peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2141  finalextra = aggform->aggmfinalextra;
2142  aggtranstype = aggform->aggmtranstype;
2143  initvalAttNo = Anum_pg_aggregate_aggminitval;
2144  }
2145  else
2146  {
2147  peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2148  peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2149  peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2150  finalextra = aggform->aggfinalextra;
2151  aggtranstype = aggform->aggtranstype;
2152  initvalAttNo = Anum_pg_aggregate_agginitval;
2153  }
2154 
2155  /*
2156  * ExecInitWindowAgg already checked permission to call aggregate function
2157  * ... but we still need to check the component functions
2158  */
2159 
2160  /* Check that aggregate owner has permission to call component fns */
2161  {
2162  HeapTuple procTuple;
2163  Oid aggOwner;
2164 
2165  procTuple = SearchSysCache1(PROCOID,
2166  ObjectIdGetDatum(wfunc->winfnoid));
2167  if (!HeapTupleIsValid(procTuple))
2168  elog(ERROR, "cache lookup failed for function %u",
2169  wfunc->winfnoid);
2170  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2171  ReleaseSysCache(procTuple);
2172 
2173  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2174  ACL_EXECUTE);
2175  if (aclresult != ACLCHECK_OK)
2176  aclcheck_error(aclresult, ACL_KIND_PROC,
2177  get_func_name(transfn_oid));
2178  InvokeFunctionExecuteHook(transfn_oid);
2179 
2180  if (OidIsValid(invtransfn_oid))
2181  {
2182  aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2183  ACL_EXECUTE);
2184  if (aclresult != ACLCHECK_OK)
2185  aclcheck_error(aclresult, ACL_KIND_PROC,
2186  get_func_name(invtransfn_oid));
2187  InvokeFunctionExecuteHook(invtransfn_oid);
2188  }
2189 
2190  if (OidIsValid(finalfn_oid))
2191  {
2192  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2193  ACL_EXECUTE);
2194  if (aclresult != ACLCHECK_OK)
2195  aclcheck_error(aclresult, ACL_KIND_PROC,
2196  get_func_name(finalfn_oid));
2197  InvokeFunctionExecuteHook(finalfn_oid);
2198  }
2199  }
2200 
2201  /* Detect how many arguments to pass to the finalfn */
2202  if (finalextra)
2203  peraggstate->numFinalArgs = numArguments + 1;
2204  else
2205  peraggstate->numFinalArgs = 1;
2206 
2207  /* resolve actual type of transition state, if polymorphic */
2208  aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2209  aggtranstype,
2210  inputTypes,
2211  numArguments);
2212 
2213  /* build expression trees using actual argument & result types */
2214  build_aggregate_transfn_expr(inputTypes,
2215  numArguments,
2216  0, /* no ordered-set window functions yet */
2217  false, /* no variadic window functions yet */
2218  aggtranstype,
2219  wfunc->inputcollid,
2220  transfn_oid,
2221  invtransfn_oid,
2222  &transfnexpr,
2223  &invtransfnexpr);
2224 
2225  /* set up infrastructure for calling the transfn(s) and finalfn */
2226  fmgr_info(transfn_oid, &peraggstate->transfn);
2227  fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2228 
2229  if (OidIsValid(invtransfn_oid))
2230  {
2231  fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2232  fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2233  }
2234 
2235  if (OidIsValid(finalfn_oid))
2236  {
2237  build_aggregate_finalfn_expr(inputTypes,
2238  peraggstate->numFinalArgs,
2239  aggtranstype,
2240  wfunc->wintype,
2241  wfunc->inputcollid,
2242  finalfn_oid,
2243  &finalfnexpr);
2244  fmgr_info(finalfn_oid, &peraggstate->finalfn);
2245  fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2246  }
2247 
2248  /* get info about relevant datatypes */
2249  get_typlenbyval(wfunc->wintype,
2250  &peraggstate->resulttypeLen,
2251  &peraggstate->resulttypeByVal);
2252  get_typlenbyval(aggtranstype,
2253  &peraggstate->transtypeLen,
2254  &peraggstate->transtypeByVal);
2255 
2256  /*
2257  * initval is potentially null, so don't try to access it as a struct
2258  * field. Must do it the hard way with SysCacheGetAttr.
2259  */
2260  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2261  &peraggstate->initValueIsNull);
2262 
2263  if (peraggstate->initValueIsNull)
2264  peraggstate->initValue = (Datum) 0;
2265  else
2266  peraggstate->initValue = GetAggInitVal(textInitVal,
2267  aggtranstype);
2268 
2269  /*
2270  * If the transfn is strict and the initval is NULL, make sure input type
2271  * and transtype are the same (or at least binary-compatible), so that
2272  * it's OK to use the first input value as the initial transValue. This
2273  * should have been checked at agg definition time, but we must check
2274  * again in case the transfn's strictness property has been changed.
2275  */
2276  if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2277  {
2278  if (numArguments < 1 ||
2279  !IsBinaryCoercible(inputTypes[0], aggtranstype))
2280  ereport(ERROR,
2281  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2282  errmsg("aggregate %u needs to have compatible input type and transition type",
2283  wfunc->winfnoid)));
2284  }
2285 
2286  /*
2287  * Insist that forward and inverse transition functions have the same
2288  * strictness setting. Allowing them to differ would require handling
2289  * more special cases in advance_windowaggregate and
2290  * advance_windowaggregate_base, for no discernible benefit. This should
2291  * have been checked at agg definition time, but we must check again in
2292  * case either function's strictness property has been changed.
2293  */
2294  if (OidIsValid(invtransfn_oid) &&
2295  peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2296  ereport(ERROR,
2297  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2298  errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2299 
2300  /*
2301  * Moving aggregates use their own aggcontext.
2302  *
2303  * This is necessary because they might restart at different times, so we
2304  * might never be able to reset the shared context otherwise. We can't
2305  * make it the aggregates' responsibility to clean up after themselves,
2306  * because strict aggregates must be restarted whenever we remove their
2307  * last non-NULL input, which the aggregate won't be aware is happening.
2308  * Also, just pfree()ing the transValue upon restarting wouldn't help,
2309  * since we'd miss any indirectly referenced data. We could, in theory,
2310  * make the memory allocation rules for moving aggregates different than
2311  * they have historically been for plain aggregates, but that seems grotty
2312  * and likely to lead to memory leaks.
2313  */
2314  if (OidIsValid(invtransfn_oid))
2315  peraggstate->aggcontext =
2317  "WindowAgg Per Aggregate",
2319  else
2320  peraggstate->aggcontext = winstate->aggcontext;
2321 
2322  ReleaseSysCache(aggTuple);
2323 
2324  return peraggstate;
2325 }
2326 
2327 static Datum
2328 GetAggInitVal(Datum textInitVal, Oid transtype)
2329 {
2330  Oid typinput,
2331  typioparam;
2332  char *strInitVal;
2333  Datum initVal;
2334 
2335  getTypeInputInfo(transtype, &typinput, &typioparam);
2336  strInitVal = TextDatumGetCString(textInitVal);
2337  initVal = OidInputFunctionCall(typinput, strInitVal,
2338  typioparam, -1);
2339  pfree(strInitVal);
2340  return initVal;
2341 }
2342 
2343 /*
2344  * are_peers
2345  * compare two rows to see if they are equal according to the ORDER BY clause
2346  *
2347  * NB: this does not consider the window frame mode.
2348  */
2349 static bool
2351  TupleTableSlot *slot2)
2352 {
2353  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2354 
2355  /* If no ORDER BY, all rows are peers with each other */
2356  if (node->ordNumCols == 0)
2357  return true;
2358 
2359  return execTuplesMatch(slot1, slot2,
2360  node->ordNumCols, node->ordColIdx,
2361  winstate->ordEqfunctions,
2362  winstate->tmpcontext->ecxt_per_tuple_memory);
2363 }
2364 
2365 /*
2366  * window_gettupleslot
2367  * Fetch the pos'th tuple of the current partition into the slot,
2368  * using the winobj's read pointer
2369  *
2370  * Returns true if successful, false if no such row
2371  */
2372 static bool
2374 {
2375  WindowAggState *winstate = winobj->winstate;
2376  MemoryContext oldcontext;
2377 
2378  /* often called repeatedly in a row */
2380 
2381  /* Don't allow passing -1 to spool_tuples here */
2382  if (pos < 0)
2383  return false;
2384 
2385  /* If necessary, fetch the tuple into the spool */
2386  spool_tuples(winstate, pos);
2387 
2388  if (pos >= winstate->spooled_rows)
2389  return false;
2390 
2391  if (pos < winobj->markpos)
2392  elog(ERROR, "cannot fetch row before WindowObject's mark position");
2393 
2395 
2396  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2397 
2398  /*
2399  * Advance or rewind until we are within one tuple of the one we want.
2400  */
2401  if (winobj->seekpos < pos - 1)
2402  {
2403  if (!tuplestore_skiptuples(winstate->buffer,
2404  pos - 1 - winobj->seekpos,
2405  true))
2406  elog(ERROR, "unexpected end of tuplestore");
2407  winobj->seekpos = pos - 1;
2408  }
2409  else if (winobj->seekpos > pos + 1)
2410  {
2411  if (!tuplestore_skiptuples(winstate->buffer,
2412  winobj->seekpos - (pos + 1),
2413  false))
2414  elog(ERROR, "unexpected end of tuplestore");
2415  winobj->seekpos = pos + 1;
2416  }
2417  else if (winobj->seekpos == pos)
2418  {
2419  /*
2420  * There's no API to refetch the tuple at the current position. We
2421  * have to move one tuple forward, and then one backward. (We don't
2422  * do it the other way because we might try to fetch the row before
2423  * our mark, which isn't allowed.) XXX this case could stand to be
2424  * optimized.
2425  */
2426  tuplestore_advance(winstate->buffer, true);
2427  winobj->seekpos++;
2428  }
2429 
2430  /*
2431  * Now we should be on the tuple immediately before or after the one we
2432  * want, so just fetch forwards or backwards as appropriate.
2433  */
2434  if (winobj->seekpos > pos)
2435  {
2436  if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2437  elog(ERROR, "unexpected end of tuplestore");
2438  winobj->seekpos--;
2439  }
2440  else
2441  {
2442  if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2443  elog(ERROR, "unexpected end of tuplestore");
2444  winobj->seekpos++;
2445  }
2446 
2447  Assert(winobj->seekpos == pos);
2448 
2449  MemoryContextSwitchTo(oldcontext);
2450 
2451  return true;
2452 }
2453 
2454 
2455 /***********************************************************************
2456  * API exposed to window functions
2457  ***********************************************************************/
2458 
2459 
2460 /*
2461  * WinGetPartitionLocalMemory
2462  * Get working memory that lives till end of partition processing
2463  *
2464  * On first call within a given partition, this allocates and zeroes the
2465  * requested amount of space. Subsequent calls just return the same chunk.
2466  *
2467  * Memory obtained this way is normally used to hold state that should be
2468  * automatically reset for each new partition. If a window function wants
2469  * to hold state across the whole query, fcinfo->fn_extra can be used in the
2470  * usual way for that.
2471  */
2472 void *
2474 {
2475  Assert(WindowObjectIsValid(winobj));
2476  if (winobj->localmem == NULL)
2477  winobj->localmem =
2479  return winobj->localmem;
2480 }
2481 
2482 /*
2483  * WinGetCurrentPosition
2484  * Return the current row's position (counting from 0) within the current
2485  * partition.
2486  */
2487 int64
2489 {
2490  Assert(WindowObjectIsValid(winobj));
2491  return winobj->winstate->currentpos;
2492 }
2493 
2494 /*
2495  * WinGetPartitionRowCount
2496  * Return total number of rows contained in the current partition.
2497  *
2498  * Note: this is a relatively expensive operation because it forces the
2499  * whole partition to be "spooled" into the tuplestore at once. Once
2500  * executed, however, additional calls within the same partition are cheap.
2501  */
2502 int64
2504 {
2505  Assert(WindowObjectIsValid(winobj));
2506  spool_tuples(winobj->winstate, -1);
2507  return winobj->winstate->spooled_rows;
2508 }
2509 
2510 /*
2511  * WinSetMarkPosition
2512  * Set the "mark" position for the window object, which is the oldest row
2513  * number (counting from 0) it is allowed to fetch during all subsequent
2514  * operations within the current partition.
2515  *
2516  * Window functions do not have to call this, but are encouraged to move the
2517  * mark forward when possible to keep the tuplestore size down and prevent
2518  * having to spill rows to disk.
2519  */
2520 void
2521 WinSetMarkPosition(WindowObject winobj, int64 markpos)
2522 {
2523  WindowAggState *winstate;
2524 
2525  Assert(WindowObjectIsValid(winobj));
2526  winstate = winobj->winstate;
2527 
2528  if (markpos < winobj->markpos)
2529  elog(ERROR, "cannot move WindowObject's mark position backward");
2530  tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2531  if (markpos > winobj->markpos)
2532  {
2533  tuplestore_skiptuples(winstate->buffer,
2534  markpos - winobj->markpos,
2535  true);
2536  winobj->markpos = markpos;
2537  }
2538  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2539  if (markpos > winobj->seekpos)
2540  {
2541  tuplestore_skiptuples(winstate->buffer,
2542  markpos - winobj->seekpos,
2543  true);
2544  winobj->seekpos = markpos;
2545  }
2546 }
2547 
2548 /*
2549  * WinRowsArePeers
2550  * Compare two rows (specified by absolute position in window) to see
2551  * if they are equal according to the ORDER BY clause.
2552  *
2553  * NB: this does not consider the window frame mode.
2554  */
2555 bool
2556 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2557 {
2558  WindowAggState *winstate;
2559  WindowAgg *node;
2560  TupleTableSlot *slot1;
2561  TupleTableSlot *slot2;
2562  bool res;
2563 
2564  Assert(WindowObjectIsValid(winobj));
2565  winstate = winobj->winstate;
2566  node = (WindowAgg *) winstate->ss.ps.plan;
2567 
2568  /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2569  if (node->ordNumCols == 0)
2570  return true;
2571 
2572  slot1 = winstate->temp_slot_1;
2573  slot2 = winstate->temp_slot_2;
2574 
2575  if (!window_gettupleslot(winobj, pos1, slot1))
2576  elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2577  pos1);
2578  if (!window_gettupleslot(winobj, pos2, slot2))
2579  elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2580  pos2);
2581 
2582  res = are_peers(winstate, slot1, slot2);
2583 
2584  ExecClearTuple(slot1);
2585  ExecClearTuple(slot2);
2586 
2587  return res;
2588 }
2589 
2590 /*
2591  * WinGetFuncArgInPartition
2592  * Evaluate a window function's argument expression on a specified
2593  * row of the partition. The row is identified in lseek(2) style,
2594  * i.e. relative to the current, first, or last row.
2595  *
2596  * argno: argument number to evaluate (counted from 0)
2597  * relpos: signed rowcount offset from the seek position
2598  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2599  * set_mark: If the row is found and set_mark is true, the mark is moved to
2600  * the row as a side-effect.
2601  * isnull: output argument, receives isnull status of result
2602  * isout: output argument, set to indicate whether target row position
2603  * is out of partition (can pass NULL if caller doesn't care about this)
2604  *
2605  * Specifying a nonexistent row is not an error, it just causes a null result
2606  * (plus setting *isout true, if isout isn't NULL).
2607  */
2608 Datum
2610  int relpos, int seektype, bool set_mark,
2611  bool *isnull, bool *isout)
2612 {
2613  WindowAggState *winstate;
2614  ExprContext *econtext;
2615  TupleTableSlot *slot;
2616  bool gottuple;
2617  int64 abs_pos;
2618 
2619  Assert(WindowObjectIsValid(winobj));
2620  winstate = winobj->winstate;
2621  econtext = winstate->ss.ps.ps_ExprContext;
2622  slot = winstate->temp_slot_1;
2623 
2624  switch (seektype)
2625  {
2626  case WINDOW_SEEK_CURRENT:
2627  abs_pos = winstate->currentpos + relpos;
2628  break;
2629  case WINDOW_SEEK_HEAD:
2630  abs_pos = relpos;
2631  break;
2632  case WINDOW_SEEK_TAIL:
2633  spool_tuples(winstate, -1);
2634  abs_pos = winstate->spooled_rows - 1 + relpos;
2635  break;
2636  default:
2637  elog(ERROR, "unrecognized window seek type: %d", seektype);
2638  abs_pos = 0; /* keep compiler quiet */
2639  break;
2640  }
2641 
2642  gottuple = window_gettupleslot(winobj, abs_pos, slot);
2643 
2644  if (!gottuple)
2645  {
2646  if (isout)
2647  *isout = true;
2648  *isnull = true;
2649  return (Datum) 0;
2650  }
2651  else
2652  {
2653  if (isout)
2654  *isout = false;
2655  if (set_mark)
2656  {
2657  int frameOptions = winstate->frameOptions;
2658  int64 mark_pos = abs_pos;
2659 
2660  /*
2661  * In RANGE mode with a moving frame head, we must not let the
2662  * mark advance past frameheadpos, since that row has to be
2663  * fetchable during future update_frameheadpos calls.
2664  *
2665  * XXX it is very ugly to pollute window functions' marks with
2666  * this consideration; it could for instance mask a logic bug that
2667  * lets a window function fetch rows before what it had claimed
2668  * was its mark. Perhaps use a separate mark for frame head
2669  * probes?
2670  */
2671  if ((frameOptions & FRAMEOPTION_RANGE) &&
2672  !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2673  {
2674  update_frameheadpos(winobj, winstate->temp_slot_2);
2675  if (mark_pos > winstate->frameheadpos)
2676  mark_pos = winstate->frameheadpos;
2677  }
2678  WinSetMarkPosition(winobj, mark_pos);
2679  }
2680  econtext->ecxt_outertuple = slot;
2681  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2682  econtext, isnull);
2683  }
2684 }
2685 
2686 /*
2687  * WinGetFuncArgInFrame
2688  * Evaluate a window function's argument expression on a specified
2689  * row of the window frame. The row is identified in lseek(2) style,
2690  * i.e. relative to the current, first, or last row.
2691  *
2692  * argno: argument number to evaluate (counted from 0)
2693  * relpos: signed rowcount offset from the seek position
2694  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2695  * set_mark: If the row is found and set_mark is true, the mark is moved to
2696  * the row as a side-effect.
2697  * isnull: output argument, receives isnull status of result
2698  * isout: output argument, set to indicate whether target row position
2699  * is out of frame (can pass NULL if caller doesn't care about this)
2700  *
2701  * Specifying a nonexistent row is not an error, it just causes a null result
2702  * (plus setting *isout true, if isout isn't NULL).
2703  */
2704 Datum
2706  int relpos, int seektype, bool set_mark,
2707  bool *isnull, bool *isout)
2708 {
2709  WindowAggState *winstate;
2710  ExprContext *econtext;
2711  TupleTableSlot *slot;
2712  bool gottuple;
2713  int64 abs_pos;
2714 
2715  Assert(WindowObjectIsValid(winobj));
2716  winstate = winobj->winstate;
2717  econtext = winstate->ss.ps.ps_ExprContext;
2718  slot = winstate->temp_slot_1;
2719 
2720  switch (seektype)
2721  {
2722  case WINDOW_SEEK_CURRENT:
2723  abs_pos = winstate->currentpos + relpos;
2724  break;
2725  case WINDOW_SEEK_HEAD:
2726  update_frameheadpos(winobj, slot);
2727  abs_pos = winstate->frameheadpos + relpos;
2728  break;
2729  case WINDOW_SEEK_TAIL:
2730  update_frametailpos(winobj, slot);
2731  abs_pos = winstate->frametailpos + relpos;
2732  break;
2733  default:
2734  elog(ERROR, "unrecognized window seek type: %d", seektype);
2735  abs_pos = 0; /* keep compiler quiet */
2736  break;
2737  }
2738 
2739  gottuple = window_gettupleslot(winobj, abs_pos, slot);
2740  if (gottuple)
2741  gottuple = row_is_in_frame(winstate, abs_pos, slot);
2742 
2743  if (!gottuple)
2744  {
2745  if (isout)
2746  *isout = true;
2747  *isnull = true;
2748  return (Datum) 0;
2749  }
2750  else
2751  {
2752  if (isout)
2753  *isout = false;
2754  if (set_mark)
2755  {
2756  int frameOptions = winstate->frameOptions;
2757  int64 mark_pos = abs_pos;
2758 
2759  /*
2760  * In RANGE mode with a moving frame head, we must not let the
2761  * mark advance past frameheadpos, since that row has to be
2762  * fetchable during future update_frameheadpos calls.
2763  *
2764  * XXX it is very ugly to pollute window functions' marks with
2765  * this consideration; it could for instance mask a logic bug that
2766  * lets a window function fetch rows before what it had claimed
2767  * was its mark. Perhaps use a separate mark for frame head
2768  * probes?
2769  */
2770  if ((frameOptions & FRAMEOPTION_RANGE) &&
2771  !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2772  {
2773  update_frameheadpos(winobj, winstate->temp_slot_2);
2774  if (mark_pos > winstate->frameheadpos)
2775  mark_pos = winstate->frameheadpos;
2776  }
2777  WinSetMarkPosition(winobj, mark_pos);
2778  }
2779  econtext->ecxt_outertuple = slot;
2780  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2781  econtext, isnull);
2782  }
2783 }
2784 
2785 /*
2786  * WinGetFuncArgCurrent
2787  * Evaluate a window function's argument expression on the current row.
2788  *
2789  * argno: argument number to evaluate (counted from 0)
2790  * isnull: output argument, receives isnull status of result
2791  *
2792  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2793  * WinGetFuncArgInFrame targeting the current row, because it will succeed
2794  * even if the WindowObject's mark has been set beyond the current row.
2795  * This should generally be used for "ordinary" arguments of a window
2796  * function, such as the offset argument of lead() or lag().
2797  */
2798 Datum
2799 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2800 {
2801  WindowAggState *winstate;
2802  ExprContext *econtext;
2803 
2804  Assert(WindowObjectIsValid(winobj));
2805  winstate = winobj->winstate;
2806 
2807  econtext = winstate->ss.ps.ps_ExprContext;
2808 
2809  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2810  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2811  econtext, isnull);
2812 }
signed short int16
Definition: c.h:255
int ordNumCols
Definition: plannodes.h:806
void tuplestore_puttupleslot(Tuplestorestate *state, TupleTableSlot *slot)
Definition: tuplestore.c:708
#define NIL
Definition: pg_list.h:69
Datum WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
Definition: fmgr.h:56
List * qual
Definition: plannodes.h:145
bool WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
MemoryContext curaggcontext
Definition: execnodes.h:1863
ExprState * endOffset
Definition: execnodes.h:1857
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
void * WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
Datum * ecxt_aggvalues
Definition: execnodes.h:213
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:300
bool tuplestore_advance(Tuplestorestate *state, bool forward)
Definition: tuplestore.c:1110
struct WindowStatePerAggData * WindowStatePerAgg
Definition: execnodes.h:1829
int64 WinGetPartitionRowCount(WindowObject winobj)
Datum startOffsetValue
Definition: execnodes.h:1858
static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, Datum *result, bool *isnull)
void ExecInitScanTupleSlot(EState *estate, ScanState *scanstate)
Definition: execTuples.c:842
ExprState * aggfilter
Definition: execnodes.h:670
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:882
#define Anum_pg_aggregate_agginitval
Definition: pg_aggregate.h:113
static bool window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
List * args
Definition: primnodes.h:359
MemoryContext MemoryContextGetParent(MemoryContext context)
Definition: mcxt.c:403
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2962
Oid GetUserId(void)
Definition: miscinit.c:284
#define castNode(_type_, nodeptr)
Definition: nodes.h:578
void tuplestore_trim(Tuplestorestate *state)
Definition: tuplestore.c:1360
ScanState ss
Definition: execnodes.h:1833
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:523
void tuplestore_set_eflags(Tuplestorestate *state, int eflags)
Definition: tuplestore.c:359
ExprContext * ps_ExprContext
Definition: execnodes.h:881
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:203
#define FRAMEOPTION_START_VALUE
Definition: parsenodes.h:519
AttrNumber * ordColIdx
Definition: plannodes.h:807
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
bool execTuplesMatch(TupleTableSlot *slot1, TupleTableSlot *slot2, int numCols, AttrNumber *matchColIdx, FmgrInfo *eqfunctions, MemoryContext evalContext)
Definition: execGrouping.c:69
int64 aggregatedupto
Definition: execnodes.h:1853
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:858
return result
Definition: formatting.c:1633
bool frametail_valid
Definition: execnodes.h:1874
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1101
static void release_partition(WindowAggState *winstate)
struct WindowStatePerFuncData * WindowStatePerFunc
Definition: execnodes.h:1828
FmgrInfo * partEqfunctions
Definition: execnodes.h:1842
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2019
WindowFuncExprState * wfuncstate
Definition: nodeWindowAgg.c:78
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:957
EState * state
Definition: execnodes.h:849
unsigned int Oid
Definition: postgres_ext.h:31
NodeTag
Definition: nodes.h:26
Index winref
Definition: primnodes.h:361
#define FRAMEOPTION_START_UNBOUNDED_PRECEDING
Definition: parsenodes.h:508
TupleTableSlot * temp_slot_1
Definition: execnodes.h:1882
#define OidIsValid(objectId)
Definition: c.h:538
WindowStatePerFunc perfunc
Definition: execnodes.h:1840
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:521
Oid * ordOperators
Definition: plannodes.h:808
static void spool_tuples(WindowAggState *winstate, int64 pos)
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:156
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:445
#define WINDOW_SEEK_TAIL
Definition: windowapi.h:34
TupleTableSlot * first_part_slot
Definition: execnodes.h:1877
static bool advance_windowaggregate_base(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
#define FUNC_MAX_ARGS
ExprContext * tmpcontext
Definition: execnodes.h:1864
PlanState ps
Definition: execnodes.h:1098
struct WindowObjectData * agg_winobj
Definition: execnodes.h:1851
Node * startOffset
Definition: plannodes.h:810
bool tuplestore_in_memory(Tuplestorestate *state)
Definition: tuplestore.c:1455
int64 frameheadpos
Definition: execnodes.h:1848
FmgrInfo * flinfo
Definition: fmgr.h:79
static WindowStatePerAggData * initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, WindowStatePerAgg peraggstate)
void pfree(void *pointer)
Definition: mcxt.c:950
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:61
Expr * expr
Definition: execnodes.h:83
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1412
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
struct WindowObjectData WindowObjectData
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
static bool row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
static struct @121 value
#define DatumGetInt64(X)
Definition: postgres.h:613
struct WindowStatePerFuncData WindowStatePerFuncData
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define EXEC_FLAG_BACKWARD
Definition: executor.h:60
#define outerPlanState(node)
Definition: execnodes.h:893
TupleTableSlot * temp_slot_2
Definition: execnodes.h:1883
Datum WinGetFuncArgInPartition(WindowObject winobj, int argno, int relpos, int seektype, bool set_mark, bool *isnull, bool *isout)
Datum endOffsetValue
Definition: execnodes.h:1859
void * list_nth(const List *list, int n)
Definition: list.c:410
WindowStatePerAgg peragg
Definition: execnodes.h:1841
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:137
ExprState * startOffset
Definition: execnodes.h:1856
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:492
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:285
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
int64 aggregatedbase
Definition: execnodes.h:1852
Node * endOffset
Definition: plannodes.h:811
#define FRAMEOPTION_END_CURRENT_ROW
Definition: parsenodes.h:513
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:104
#define FRAMEOPTION_START_VALUE_PRECEDING
Definition: parsenodes.h:514
#define DatumGetBool(X)
Definition: postgres.h:399
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
#define FRAMEOPTION_END_UNBOUNDED_FOLLOWING
Definition: parsenodes.h:511
#define TupIsNull(slot)
Definition: tuptable.h:138
int partNumCols
Definition: plannodes.h:803
Oid winfnoid
Definition: primnodes.h:355
bool argnull[FUNC_MAX_ARGS]
Definition: fmgr.h:86
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
Tuplestorestate * buffer
Definition: execnodes.h:1844
MemoryContext aggcontext
Definition: execnodes.h:1862
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
Definition: fmgr.c:137
#define FRAMEOPTION_START_CURRENT_ROW
Definition: parsenodes.h:512
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2599
#define ereport(elevel, rest)
Definition: elog.h:122
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:128
Bitmapset * chgParam
Definition: execnodes.h:875
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:179
bool IsBinaryCoercible(Oid srctype, Oid targettype)
#define outerPlan(node)
Definition: plannodes.h:174
int64 WinGetCurrentPosition(WindowObject winobj)
ExpandedObjectHeader * DatumGetEOHP(Datum d)
Definition: expandeddatum.c:29
static void eval_windowaggregates(WindowAggState *winstate)
int64 spooled_rows
Definition: execnodes.h:1846
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define WINDOW_SEEK_HEAD
Definition: windowapi.h:33
bool * ecxt_aggnulls
Definition: execnodes.h:214
#define TextDatumGetCString(d)
Definition: builtins.h:92
static TupleTableSlot * ExecWindowAgg(PlanState *pstate)
#define Anum_pg_aggregate_aggminitval
Definition: pg_aggregate.h:114
WindowAggState * winstate
Definition: nodeWindowAgg.c:62
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:853
AclResult
Definition: acl.h:170
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:245
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1279
int work_mem
Definition: globals.c:113
TupleTableSlot * agg_row_slot
Definition: execnodes.h:1881
static void initialize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
AttrNumber * partColIdx
Definition: plannodes.h:804
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:83
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
Plan * plan
Definition: execnodes.h:847
static void begin_partition(WindowAggState *winstate)
void DeleteExpandedObject(Datum d)
#define InvalidOid
Definition: postgres_ext.h:36
bool more_partitions
Definition: execnodes.h:1870
Datum arg[FUNC_MAX_ARGS]
Definition: fmgr.h:85
#define WindowObjectIsValid(winobj)
Definition: windowapi.h:41
FmgrInfo * ordEqfunctions
Definition: execnodes.h:1843
#define Max(x, y)
Definition: c.h:801
Oid * partOperators
Definition: plannodes.h:805
TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: execTuples.c:795
#define makeNode(_type_)
Definition: nodes.h:557
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
void ExecEndWindowAgg(WindowAggState *node)
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:199
#define WINDOW_SEEK_CURRENT
Definition: windowapi.h:32
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
int64 frametailpos
Definition: execnodes.h:1849
#define NULL
Definition: c.h:229
#define FRAMEOPTION_RANGE
Definition: parsenodes.h:505
#define Assert(condition)
Definition: c.h:676
#define lfirst(lc)
Definition: pg_list.h:106
Index winref
Definition: plannodes.h:802
#define EXEC_FLAG_MARK
Definition: executor.h:61
#define DatumIsReadWriteExpandedObject(d, isnull, typlen)
static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:87
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:567
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1882
size_t Size
Definition: c.h:356
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:423
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:120
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
static int list_length(const List *l)
Definition: pg_list.h:89
MemoryContext aggcontext
void tuplestore_end(Tuplestorestate *state)
Definition: tuplestore.c:453
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2001
#define FRAMEOPTION_ROWS
Definition: parsenodes.h:506
bool tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
Definition: tuplestore.c:1135
WindowAggState * ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
Oid inputcollid
Definition: primnodes.h:358
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:202
Datum WinGetFuncArgInFrame(WindowObject winobj, int argno, int relpos, int seektype, bool set_mark, bool *isnull, bool *isout)
#define FRAMEOPTION_END_VALUE_PRECEDING
Definition: parsenodes.h:515
#define INT64_FORMAT
Definition: c.h:315
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
int64 currentpos
Definition: execnodes.h:1847
ExprState * qual
Definition: execnodes.h:865
#define DatumGetPointer(X)
Definition: postgres.h:555
Oid wintype
Definition: primnodes.h:356
int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
Definition: tuplestore.c:383
static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, TupleTableSlot *slot2)
bool partition_spooled
Definition: execnodes.h:1868
void ExecAssignScanTypeFromOuterPlan(ScanState *scanstate)
Definition: execUtils.c:557
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool framehead_valid
Definition: execnodes.h:1872
#define ACL_EXECUTE
Definition: parsenodes.h:79
bool winagg
Definition: primnodes.h:363
AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4446
int i
Plan plan
Definition: plannodes.h:801
void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
Definition: tuplestore.c:473
struct WindowStatePerAggData WindowStatePerAggData
void WinSetMarkPosition(WindowObject winobj, int64 markpos)
void ExecReScanWindowAgg(WindowAggState *node)
void * arg
MemoryContext partcontext
Definition: execnodes.h:1861
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
int frameOptions
Definition: plannodes.h:809
WindowFunc * wfunc
Definition: execnodes.h:668
FmgrInfo * execTuplesMatchPrepare(int numCols, Oid *eqOperators)
Definition: execGrouping.c:204
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
Definition: pg_list.h:45
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1738
int16 AttrNumber
Definition: attnum.h:21
#define FRAMEOPTION_END_VALUE
Definition: parsenodes.h:521
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:328
static void eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, Datum *result, bool *isnull)
#define ResetExprContext(econtext)
Definition: executor.h:469
Oid resolve_aggregate_transtype(Oid aggfuncid, Oid aggtranstype, Oid *inputTypes, int numArguments)
Definition: parse_agg.c:1827