PostgreSQL Source Code  git master
nodeWindowAgg.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  * routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set. Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications. The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore. The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  * src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35 
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/regproc.h"
53 #include "utils/syscache.h"
54 #include "windowapi.h"
55 
56 /*
57  * All the window function APIs are called with this object, which is passed
58  * to window functions as fcinfo->context.
59  */
60 typedef struct WindowObjectData
61 {
63  WindowAggState *winstate; /* parent WindowAggState */
64  List *argstates; /* ExprState trees for fn's arguments */
65  void *localmem; /* WinGetPartitionLocalMemory's chunk */
66  int markptr; /* tuplestore mark pointer for this fn */
67  int readptr; /* tuplestore read pointer for this fn */
68  int64 markpos; /* row that markptr is positioned on */
69  int64 seekpos; /* row that readptr is positioned on */
71 
72 /*
73  * We have one WindowStatePerFunc struct for each window function and
74  * window aggregate handled by this node.
75  */
76 typedef struct WindowStatePerFuncData
77 {
78  /* Links to WindowFunc expr and state nodes this working state is for */
81 
82  int numArguments; /* number of arguments */
83 
84  FmgrInfo flinfo; /* fmgr lookup data for window function */
85 
86  Oid winCollation; /* collation derived for window function */
87 
88  /*
89  * We need the len and byval info for the result of each function in order
90  * to know how to copy/delete values.
91  */
94 
95  bool plain_agg; /* is it just a plain aggregate function? */
96  int aggno; /* if so, index of its PerAggData */
97 
98  WindowObject winobj; /* object used in window function API */
100 
101 /*
102  * For plain aggregate window functions, we also have one of these.
103  */
104 typedef struct WindowStatePerAggData
105 {
106  /* Oids of transition functions */
108  Oid invtransfn_oid; /* may be InvalidOid */
109  Oid finalfn_oid; /* may be InvalidOid */
110 
111  /*
112  * fmgr lookup data for transition functions --- only valid when
113  * corresponding oid is not InvalidOid. Note in particular that fn_strict
114  * flags are kept here.
115  */
119 
120  int numFinalArgs; /* number of arguments to pass to finalfn */
121 
122  /*
123  * initial value from pg_aggregate entry
124  */
127 
128  /*
129  * cached value for current frame boundaries
130  */
133 
134  /*
135  * We need the len and byval info for the agg's input, result, and
136  * transition data types in order to know how to copy/delete values.
137  */
138  int16 inputtypeLen,
139  resulttypeLen,
140  transtypeLen;
141  bool inputtypeByVal,
142  resulttypeByVal,
144 
145  int wfuncno; /* index of associated PerFuncData */
146 
147  /* Context holding transition value and possibly other subsidiary data */
148  MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
149 
150  /* Current transition value */
151  Datum transValue; /* current transition value */
153 
154  int64 transValueCount; /* number of currently-aggregated rows */
155 
156  /* Data local to eval_windowaggregates() */
157  bool restart; /* need to restart this agg in this cycle? */
159 
161  WindowStatePerFunc perfuncstate,
162  WindowStatePerAgg peraggstate);
164  WindowStatePerFunc perfuncstate,
165  WindowStatePerAgg peraggstate);
167  WindowStatePerFunc perfuncstate,
168  WindowStatePerAgg peraggstate);
170  WindowStatePerFunc perfuncstate,
171  WindowStatePerAgg peraggstate,
172  Datum *result, bool *isnull);
173 
176  WindowStatePerFunc perfuncstate,
177  Datum *result, bool *isnull);
178 
180 static void spool_tuples(WindowAggState *winstate, int64 pos);
182 
183 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
184  TupleTableSlot *slot);
185 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
186 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
187 
189  WindowFunc *wfunc,
190  WindowStatePerAgg peraggstate);
191 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
192 
193 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
194  TupleTableSlot *slot2);
195 static bool window_gettupleslot(WindowObject winobj, int64 pos,
196  TupleTableSlot *slot);
197 
198 
199 /*
200  * initialize_windowaggregate
201  * parallel to initialize_aggregates in nodeAgg.c
202  */
203 static void
205  WindowStatePerFunc perfuncstate,
206  WindowStatePerAgg peraggstate)
207 {
208  MemoryContext oldContext;
209 
210  /*
211  * If we're using a private aggcontext, we may reset it here. But if the
212  * context is shared, we don't know which other aggregates may still need
213  * it, so we must leave it to the caller to reset at an appropriate time.
214  */
215  if (peraggstate->aggcontext != winstate->aggcontext)
217 
218  if (peraggstate->initValueIsNull)
219  peraggstate->transValue = peraggstate->initValue;
220  else
221  {
222  oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
223  peraggstate->transValue = datumCopy(peraggstate->initValue,
224  peraggstate->transtypeByVal,
225  peraggstate->transtypeLen);
226  MemoryContextSwitchTo(oldContext);
227  }
228  peraggstate->transValueIsNull = peraggstate->initValueIsNull;
229  peraggstate->transValueCount = 0;
230  peraggstate->resultValue = (Datum) 0;
231  peraggstate->resultValueIsNull = true;
232 }
233 
234 /*
235  * advance_windowaggregate
236  * parallel to advance_aggregates in nodeAgg.c
237  */
238 static void
240  WindowStatePerFunc perfuncstate,
241  WindowStatePerAgg peraggstate)
242 {
243  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
244  int numArguments = perfuncstate->numArguments;
245  FunctionCallInfoData fcinfodata;
246  FunctionCallInfo fcinfo = &fcinfodata;
247  Datum newVal;
248  ListCell *arg;
249  int i;
250  MemoryContext oldContext;
251  ExprContext *econtext = winstate->tmpcontext;
252  ExprState *filter = wfuncstate->aggfilter;
253 
254  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
255 
256  /* Skip anything FILTERed out */
257  if (filter)
258  {
259  bool isnull;
260  Datum res = ExecEvalExpr(filter, econtext, &isnull);
261 
262  if (isnull || !DatumGetBool(res))
263  {
264  MemoryContextSwitchTo(oldContext);
265  return;
266  }
267  }
268 
269  /* We start from 1, since the 0th arg will be the transition value */
270  i = 1;
271  foreach(arg, wfuncstate->args)
272  {
273  ExprState *argstate = (ExprState *) lfirst(arg);
274 
275  fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
276  &fcinfo->argnull[i]);
277  i++;
278  }
279 
280  if (peraggstate->transfn.fn_strict)
281  {
282  /*
283  * For a strict transfn, nothing happens when there's a NULL input; we
284  * just keep the prior transValue. Note transValueCount doesn't
285  * change either.
286  */
287  for (i = 1; i <= numArguments; i++)
288  {
289  if (fcinfo->argnull[i])
290  {
291  MemoryContextSwitchTo(oldContext);
292  return;
293  }
294  }
295 
296  /*
297  * For strict transition functions with initial value NULL we use the
298  * first non-NULL input as the initial state. (We already checked
299  * that the agg's input type is binary-compatible with its transtype,
300  * so straight copy here is OK.)
301  *
302  * We must copy the datum into aggcontext if it is pass-by-ref. We do
303  * not need to pfree the old transValue, since it's NULL.
304  */
305  if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
306  {
307  MemoryContextSwitchTo(peraggstate->aggcontext);
308  peraggstate->transValue = datumCopy(fcinfo->arg[1],
309  peraggstate->transtypeByVal,
310  peraggstate->transtypeLen);
311  peraggstate->transValueIsNull = false;
312  peraggstate->transValueCount = 1;
313  MemoryContextSwitchTo(oldContext);
314  return;
315  }
316 
317  if (peraggstate->transValueIsNull)
318  {
319  /*
320  * Don't call a strict function with NULL inputs. Note it is
321  * possible to get here despite the above tests, if the transfn is
322  * strict *and* returned a NULL on a prior cycle. If that happens
323  * we will propagate the NULL all the way to the end. That can
324  * only happen if there's no inverse transition function, though,
325  * since we disallow transitions back to NULL when there is one.
326  */
327  MemoryContextSwitchTo(oldContext);
328  Assert(!OidIsValid(peraggstate->invtransfn_oid));
329  return;
330  }
331  }
332 
333  /*
334  * OK to call the transition function. Set winstate->curaggcontext while
335  * calling it, for possible use by AggCheckCallContext.
336  */
337  InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
338  numArguments + 1,
339  perfuncstate->winCollation,
340  (void *) winstate, NULL);
341  fcinfo->arg[0] = peraggstate->transValue;
342  fcinfo->argnull[0] = peraggstate->transValueIsNull;
343  winstate->curaggcontext = peraggstate->aggcontext;
344  newVal = FunctionCallInvoke(fcinfo);
345  winstate->curaggcontext = NULL;
346 
347  /*
348  * Moving-aggregate transition functions must not return null, see
349  * advance_windowaggregate_base().
350  */
351  if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
352  ereport(ERROR,
353  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
354  errmsg("moving-aggregate transition function must not return null")));
355 
356  /*
357  * We must track the number of rows included in transValue, since to
358  * remove the last input, advance_windowaggregate_base() mustn't call the
359  * inverse transition function, but simply reset transValue back to its
360  * initial value.
361  */
362  peraggstate->transValueCount++;
363 
364  /*
365  * If pass-by-ref datatype, must copy the new value into aggcontext and
366  * free the prior transValue. But if transfn returned a pointer to its
367  * first input, we don't need to do anything. Also, if transfn returned a
368  * pointer to a R/W expanded object that is already a child of the
369  * aggcontext, assume we can adopt that value without copying it.
370  */
371  if (!peraggstate->transtypeByVal &&
372  DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
373  {
374  if (!fcinfo->isnull)
375  {
376  MemoryContextSwitchTo(peraggstate->aggcontext);
378  false,
379  peraggstate->transtypeLen) &&
381  /* do nothing */ ;
382  else
383  newVal = datumCopy(newVal,
384  peraggstate->transtypeByVal,
385  peraggstate->transtypeLen);
386  }
387  if (!peraggstate->transValueIsNull)
388  {
390  false,
391  peraggstate->transtypeLen))
392  DeleteExpandedObject(peraggstate->transValue);
393  else
394  pfree(DatumGetPointer(peraggstate->transValue));
395  }
396  }
397 
398  MemoryContextSwitchTo(oldContext);
399  peraggstate->transValue = newVal;
400  peraggstate->transValueIsNull = fcinfo->isnull;
401 }
402 
403 /*
404  * advance_windowaggregate_base
405  * Remove the oldest tuple from an aggregation.
406  *
407  * This is very much like advance_windowaggregate, except that we will call
408  * the inverse transition function (which caller must have checked is
409  * available).
410  *
411  * Returns true if we successfully removed the current row from this
412  * aggregate, false if not (in the latter case, caller is responsible
413  * for cleaning up by restarting the aggregation).
414  */
415 static bool
417  WindowStatePerFunc perfuncstate,
418  WindowStatePerAgg peraggstate)
419 {
420  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
421  int numArguments = perfuncstate->numArguments;
422  FunctionCallInfoData fcinfodata;
423  FunctionCallInfo fcinfo = &fcinfodata;
424  Datum newVal;
425  ListCell *arg;
426  int i;
427  MemoryContext oldContext;
428  ExprContext *econtext = winstate->tmpcontext;
429  ExprState *filter = wfuncstate->aggfilter;
430 
431  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
432 
433  /* Skip anything FILTERed out */
434  if (filter)
435  {
436  bool isnull;
437  Datum res = ExecEvalExpr(filter, econtext, &isnull);
438 
439  if (isnull || !DatumGetBool(res))
440  {
441  MemoryContextSwitchTo(oldContext);
442  return true;
443  }
444  }
445 
446  /* We start from 1, since the 0th arg will be the transition value */
447  i = 1;
448  foreach(arg, wfuncstate->args)
449  {
450  ExprState *argstate = (ExprState *) lfirst(arg);
451 
452  fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
453  &fcinfo->argnull[i]);
454  i++;
455  }
456 
457  if (peraggstate->invtransfn.fn_strict)
458  {
459  /*
460  * For a strict (inv)transfn, nothing happens when there's a NULL
461  * input; we just keep the prior transValue. Note transValueCount
462  * doesn't change either.
463  */
464  for (i = 1; i <= numArguments; i++)
465  {
466  if (fcinfo->argnull[i])
467  {
468  MemoryContextSwitchTo(oldContext);
469  return true;
470  }
471  }
472  }
473 
474  /* There should still be an added but not yet removed value */
475  Assert(peraggstate->transValueCount > 0);
476 
477  /*
478  * In moving-aggregate mode, the state must never be NULL, except possibly
479  * before any rows have been aggregated (which is surely not the case at
480  * this point). This restriction allows us to interpret a NULL result
481  * from the inverse function as meaning "sorry, can't do an inverse
482  * transition in this case". We already checked this in
483  * advance_windowaggregate, but just for safety, check again.
484  */
485  if (peraggstate->transValueIsNull)
486  elog(ERROR, "aggregate transition value is NULL before inverse transition");
487 
488  /*
489  * We mustn't use the inverse transition function to remove the last
490  * input. Doing so would yield a non-NULL state, whereas we should be in
491  * the initial state afterwards which may very well be NULL. So instead,
492  * we simply re-initialize the aggregate in this case.
493  */
494  if (peraggstate->transValueCount == 1)
495  {
496  MemoryContextSwitchTo(oldContext);
498  &winstate->perfunc[peraggstate->wfuncno],
499  peraggstate);
500  return true;
501  }
502 
503  /*
504  * OK to call the inverse transition function. Set
505  * winstate->curaggcontext while calling it, for possible use by
506  * AggCheckCallContext.
507  */
508  InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
509  numArguments + 1,
510  perfuncstate->winCollation,
511  (void *) winstate, NULL);
512  fcinfo->arg[0] = peraggstate->transValue;
513  fcinfo->argnull[0] = peraggstate->transValueIsNull;
514  winstate->curaggcontext = peraggstate->aggcontext;
515  newVal = FunctionCallInvoke(fcinfo);
516  winstate->curaggcontext = NULL;
517 
518  /*
519  * If the function returns NULL, report failure, forcing a restart.
520  */
521  if (fcinfo->isnull)
522  {
523  MemoryContextSwitchTo(oldContext);
524  return false;
525  }
526 
527  /* Update number of rows included in transValue */
528  peraggstate->transValueCount--;
529 
530  /*
531  * If pass-by-ref datatype, must copy the new value into aggcontext and
532  * free the prior transValue. But if invtransfn returned a pointer to its
533  * first input, we don't need to do anything. Also, if invtransfn
534  * returned a pointer to a R/W expanded object that is already a child of
535  * the aggcontext, assume we can adopt that value without copying it.
536  *
537  * Note: the checks for null values here will never fire, but it seems
538  * best to have this stanza look just like advance_windowaggregate.
539  */
540  if (!peraggstate->transtypeByVal &&
541  DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
542  {
543  if (!fcinfo->isnull)
544  {
545  MemoryContextSwitchTo(peraggstate->aggcontext);
547  false,
548  peraggstate->transtypeLen) &&
550  /* do nothing */ ;
551  else
552  newVal = datumCopy(newVal,
553  peraggstate->transtypeByVal,
554  peraggstate->transtypeLen);
555  }
556  if (!peraggstate->transValueIsNull)
557  {
559  false,
560  peraggstate->transtypeLen))
561  DeleteExpandedObject(peraggstate->transValue);
562  else
563  pfree(DatumGetPointer(peraggstate->transValue));
564  }
565  }
566 
567  MemoryContextSwitchTo(oldContext);
568  peraggstate->transValue = newVal;
569  peraggstate->transValueIsNull = fcinfo->isnull;
570 
571  return true;
572 }
573 
574 /*
575  * finalize_windowaggregate
576  * parallel to finalize_aggregate in nodeAgg.c
577  */
578 static void
580  WindowStatePerFunc perfuncstate,
581  WindowStatePerAgg peraggstate,
582  Datum *result, bool *isnull)
583 {
584  MemoryContext oldContext;
585 
587 
588  /*
589  * Apply the agg's finalfn if one is provided, else return transValue.
590  */
591  if (OidIsValid(peraggstate->finalfn_oid))
592  {
593  int numFinalArgs = peraggstate->numFinalArgs;
594  FunctionCallInfoData fcinfo;
595  bool anynull;
596  int i;
597 
598  InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
599  numFinalArgs,
600  perfuncstate->winCollation,
601  (void *) winstate, NULL);
602  fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
603  peraggstate->transValueIsNull,
604  peraggstate->transtypeLen);
605  fcinfo.argnull[0] = peraggstate->transValueIsNull;
606  anynull = peraggstate->transValueIsNull;
607 
608  /* Fill any remaining argument positions with nulls */
609  for (i = 1; i < numFinalArgs; i++)
610  {
611  fcinfo.arg[i] = (Datum) 0;
612  fcinfo.argnull[i] = true;
613  anynull = true;
614  }
615 
616  if (fcinfo.flinfo->fn_strict && anynull)
617  {
618  /* don't call a strict function with NULL inputs */
619  *result = (Datum) 0;
620  *isnull = true;
621  }
622  else
623  {
624  winstate->curaggcontext = peraggstate->aggcontext;
625  *result = FunctionCallInvoke(&fcinfo);
626  winstate->curaggcontext = NULL;
627  *isnull = fcinfo.isnull;
628  }
629  }
630  else
631  {
632  /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
633  *result = peraggstate->transValue;
634  *isnull = peraggstate->transValueIsNull;
635  }
636 
637  /*
638  * If result is pass-by-ref, make sure it is in the right context.
639  */
640  if (!peraggstate->resulttypeByVal && !*isnull &&
642  DatumGetPointer(*result)))
643  *result = datumCopy(*result,
644  peraggstate->resulttypeByVal,
645  peraggstate->resulttypeLen);
646  MemoryContextSwitchTo(oldContext);
647 }
648 
649 /*
650  * eval_windowaggregates
651  * evaluate plain aggregates being used as window functions
652  *
653  * This differs from nodeAgg.c in two ways. First, if the window's frame
654  * start position moves, we use the inverse transition function (if it exists)
655  * to remove rows from the transition value. And second, we expect to be
656  * able to call aggregate final functions repeatedly after aggregating more
657  * data onto the same transition value. This is not a behavior required by
658  * nodeAgg.c.
659  */
660 static void
662 {
663  WindowStatePerAgg peraggstate;
664  int wfuncno,
665  numaggs,
666  numaggs_restart,
667  i;
668  int64 aggregatedupto_nonrestarted;
669  MemoryContext oldContext;
670  ExprContext *econtext;
671  WindowObject agg_winobj;
672  TupleTableSlot *agg_row_slot;
673  TupleTableSlot *temp_slot;
674 
675  numaggs = winstate->numaggs;
676  if (numaggs == 0)
677  return; /* nothing to do */
678 
679  /* final output execution is in ps_ExprContext */
680  econtext = winstate->ss.ps.ps_ExprContext;
681  agg_winobj = winstate->agg_winobj;
682  agg_row_slot = winstate->agg_row_slot;
683  temp_slot = winstate->temp_slot_1;
684 
685  /*
686  * Currently, we support only a subset of the SQL-standard window framing
687  * rules.
688  *
689  * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
690  * a contiguous group of rows extending forward from the start of the
691  * partition, and rows only enter the frame, never exit it, as the current
692  * row advances forward. This makes it possible to use an incremental
693  * strategy for evaluating aggregates: we run the transition function for
694  * each row added to the frame, and run the final function whenever we
695  * need the current aggregate value. This is considerably more efficient
696  * than the naive approach of re-running the entire aggregate calculation
697  * for each current row. It does assume that the final function doesn't
698  * damage the running transition value, but we have the same assumption in
699  * nodeAgg.c too (when it rescans an existing hash table).
700  *
701  * If the frame start does sometimes move, we can still optimize as above
702  * whenever successive rows share the same frame head, but if the frame
703  * head moves beyond the previous head we try to remove those rows using
704  * the aggregate's inverse transition function. This function restores
705  * the aggregate's current state to what it would be if the removed row
706  * had never been aggregated in the first place. Inverse transition
707  * functions may optionally return NULL, indicating that the function was
708  * unable to remove the tuple from aggregation. If this happens, or if
709  * the aggregate doesn't have an inverse transition function at all, we
710  * must perform the aggregation all over again for all tuples within the
711  * new frame boundaries.
712  *
713  * In many common cases, multiple rows share the same frame and hence the
714  * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
715  * window, then all rows are peers and so they all have window frame equal
716  * to the whole partition.) We optimize such cases by calculating the
717  * aggregate value once when we reach the first row of a peer group, and
718  * then returning the saved value for all subsequent rows.
719  *
720  * 'aggregatedupto' keeps track of the first row that has not yet been
721  * accumulated into the aggregate transition values. Whenever we start a
722  * new peer group, we accumulate forward to the end of the peer group.
723  */
724 
725  /*
726  * First, update the frame head position.
727  *
728  * The frame head should never move backwards, and the code below wouldn't
729  * cope if it did, so for safety we complain if it does.
730  */
731  update_frameheadpos(agg_winobj, temp_slot);
732  if (winstate->frameheadpos < winstate->aggregatedbase)
733  elog(ERROR, "window frame head moved backward");
734 
735  /*
736  * If the frame didn't change compared to the previous row, we can re-use
737  * the result values that were previously saved at the bottom of this
738  * function. Since we don't know the current frame's end yet, this is not
739  * possible to check for fully. But if the frame end mode is UNBOUNDED
740  * FOLLOWING or CURRENT ROW, and the current row lies within the previous
741  * row's frame, then the two frames' ends must coincide. Note that on the
742  * first row aggregatedbase == aggregatedupto, meaning this test must
743  * fail, so we don't need to check the "there was no previous row" case
744  * explicitly here.
745  */
746  if (winstate->aggregatedbase == winstate->frameheadpos &&
749  winstate->aggregatedbase <= winstate->currentpos &&
750  winstate->aggregatedupto > winstate->currentpos)
751  {
752  for (i = 0; i < numaggs; i++)
753  {
754  peraggstate = &winstate->peragg[i];
755  wfuncno = peraggstate->wfuncno;
756  econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
757  econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
758  }
759  return;
760  }
761 
762  /*----------
763  * Initialize restart flags.
764  *
765  * We restart the aggregation:
766  * - if we're processing the first row in the partition, or
767  * - if the frame's head moved and we cannot use an inverse
768  * transition function, or
769  * - if the new frame doesn't overlap the old one
770  *
771  * Note that we don't strictly need to restart in the last case, but if
772  * we're going to remove all rows from the aggregation anyway, a restart
773  * surely is faster.
774  *----------
775  */
776  numaggs_restart = 0;
777  for (i = 0; i < numaggs; i++)
778  {
779  peraggstate = &winstate->peragg[i];
780  if (winstate->currentpos == 0 ||
781  (winstate->aggregatedbase != winstate->frameheadpos &&
782  !OidIsValid(peraggstate->invtransfn_oid)) ||
783  winstate->aggregatedupto <= winstate->frameheadpos)
784  {
785  peraggstate->restart = true;
786  numaggs_restart++;
787  }
788  else
789  peraggstate->restart = false;
790  }
791 
792  /*
793  * If we have any possibly-moving aggregates, attempt to advance
794  * aggregatedbase to match the frame's head by removing input rows that
795  * fell off the top of the frame from the aggregations. This can fail,
796  * i.e. advance_windowaggregate_base() can return false, in which case
797  * we'll restart that aggregate below.
798  */
799  while (numaggs_restart < numaggs &&
800  winstate->aggregatedbase < winstate->frameheadpos)
801  {
802  /*
803  * Fetch the next tuple of those being removed. This should never fail
804  * as we should have been here before.
805  */
806  if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
807  temp_slot))
808  elog(ERROR, "could not re-fetch previously fetched frame row");
809 
810  /* Set tuple context for evaluation of aggregate arguments */
811  winstate->tmpcontext->ecxt_outertuple = temp_slot;
812 
813  /*
814  * Perform the inverse transition for each aggregate function in the
815  * window, unless it has already been marked as needing a restart.
816  */
817  for (i = 0; i < numaggs; i++)
818  {
819  bool ok;
820 
821  peraggstate = &winstate->peragg[i];
822  if (peraggstate->restart)
823  continue;
824 
825  wfuncno = peraggstate->wfuncno;
826  ok = advance_windowaggregate_base(winstate,
827  &winstate->perfunc[wfuncno],
828  peraggstate);
829  if (!ok)
830  {
831  /* Inverse transition function has failed, must restart */
832  peraggstate->restart = true;
833  numaggs_restart++;
834  }
835  }
836 
837  /* Reset per-input-tuple context after each tuple */
838  ResetExprContext(winstate->tmpcontext);
839 
840  /* And advance the aggregated-row state */
841  winstate->aggregatedbase++;
842  ExecClearTuple(temp_slot);
843  }
844 
845  /*
846  * If we successfully advanced the base rows of all the aggregates,
847  * aggregatedbase now equals frameheadpos; but if we failed for any, we
848  * must forcibly update aggregatedbase.
849  */
850  winstate->aggregatedbase = winstate->frameheadpos;
851 
852  /*
853  * If we created a mark pointer for aggregates, keep it pushed up to frame
854  * head, so that tuplestore can discard unnecessary rows.
855  */
856  if (agg_winobj->markptr >= 0)
857  WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
858 
859  /*
860  * Now restart the aggregates that require it.
861  *
862  * We assume that aggregates using the shared context always restart if
863  * *any* aggregate restarts, and we may thus clean up the shared
864  * aggcontext if that is the case. Private aggcontexts are reset by
865  * initialize_windowaggregate() if their owning aggregate restarts. If we
866  * aren't restarting an aggregate, we need to free any previously saved
867  * result for it, else we'll leak memory.
868  */
869  if (numaggs_restart > 0)
871  for (i = 0; i < numaggs; i++)
872  {
873  peraggstate = &winstate->peragg[i];
874 
875  /* Aggregates using the shared ctx must restart if *any* agg does */
876  Assert(peraggstate->aggcontext != winstate->aggcontext ||
877  numaggs_restart == 0 ||
878  peraggstate->restart);
879 
880  if (peraggstate->restart)
881  {
882  wfuncno = peraggstate->wfuncno;
884  &winstate->perfunc[wfuncno],
885  peraggstate);
886  }
887  else if (!peraggstate->resultValueIsNull)
888  {
889  if (!peraggstate->resulttypeByVal)
890  pfree(DatumGetPointer(peraggstate->resultValue));
891  peraggstate->resultValue = (Datum) 0;
892  peraggstate->resultValueIsNull = true;
893  }
894  }
895 
896  /*
897  * Non-restarted aggregates now contain the rows between aggregatedbase
898  * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
899  * contain no rows. If there are any restarted aggregates, we must thus
900  * begin aggregating anew at frameheadpos, otherwise we may simply
901  * continue at aggregatedupto. We must remember the old value of
902  * aggregatedupto to know how long to skip advancing non-restarted
903  * aggregates. If we modify aggregatedupto, we must also clear
904  * agg_row_slot, per the loop invariant below.
905  */
906  aggregatedupto_nonrestarted = winstate->aggregatedupto;
907  if (numaggs_restart > 0 &&
908  winstate->aggregatedupto != winstate->frameheadpos)
909  {
910  winstate->aggregatedupto = winstate->frameheadpos;
911  ExecClearTuple(agg_row_slot);
912  }
913 
914  /*
915  * Advance until we reach a row not in frame (or end of partition).
916  *
917  * Note the loop invariant: agg_row_slot is either empty or holds the row
918  * at position aggregatedupto. We advance aggregatedupto after processing
919  * a row.
920  */
921  for (;;)
922  {
923  /* Fetch next row if we didn't already */
924  if (TupIsNull(agg_row_slot))
925  {
926  if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
927  agg_row_slot))
928  break; /* must be end of partition */
929  }
930 
931  /* Exit loop (for now) if not in frame */
932  if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
933  break;
934 
935  /* Set tuple context for evaluation of aggregate arguments */
936  winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
937 
938  /* Accumulate row into the aggregates */
939  for (i = 0; i < numaggs; i++)
940  {
941  peraggstate = &winstate->peragg[i];
942 
943  /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
944  if (!peraggstate->restart &&
945  winstate->aggregatedupto < aggregatedupto_nonrestarted)
946  continue;
947 
948  wfuncno = peraggstate->wfuncno;
949  advance_windowaggregate(winstate,
950  &winstate->perfunc[wfuncno],
951  peraggstate);
952  }
953 
954  /* Reset per-input-tuple context after each tuple */
955  ResetExprContext(winstate->tmpcontext);
956 
957  /* And advance the aggregated-row state */
958  winstate->aggregatedupto++;
959  ExecClearTuple(agg_row_slot);
960  }
961 
962  /* The frame's end is not supposed to move backwards, ever */
963  Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
964 
965  /*
966  * finalize aggregates and fill result/isnull fields.
967  */
968  for (i = 0; i < numaggs; i++)
969  {
970  Datum *result;
971  bool *isnull;
972 
973  peraggstate = &winstate->peragg[i];
974  wfuncno = peraggstate->wfuncno;
975  result = &econtext->ecxt_aggvalues[wfuncno];
976  isnull = &econtext->ecxt_aggnulls[wfuncno];
977  finalize_windowaggregate(winstate,
978  &winstate->perfunc[wfuncno],
979  peraggstate,
980  result, isnull);
981 
982  /*
983  * save the result in case next row shares the same frame.
984  *
985  * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
986  * advance that the next row can't possibly share the same frame. Is
987  * it worth detecting that and skipping this code?
988  */
989  if (!peraggstate->resulttypeByVal && !*isnull)
990  {
991  oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
992  peraggstate->resultValue =
993  datumCopy(*result,
994  peraggstate->resulttypeByVal,
995  peraggstate->resulttypeLen);
996  MemoryContextSwitchTo(oldContext);
997  }
998  else
999  {
1000  peraggstate->resultValue = *result;
1001  }
1002  peraggstate->resultValueIsNull = *isnull;
1003  }
1004 }
1005 
1006 /*
1007  * eval_windowfunction
1008  *
1009  * Arguments of window functions are not evaluated here, because a window
1010  * function can need random access to arbitrary rows in the partition.
1011  * The window function uses the special WinGetFuncArgInPartition and
1012  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1013  * it wants.
1014  */
1015 static void
1017  Datum *result, bool *isnull)
1018 {
1019  FunctionCallInfoData fcinfo;
1020  MemoryContext oldContext;
1021 
1023 
1024  /*
1025  * We don't pass any normal arguments to a window function, but we do pass
1026  * it the number of arguments, in order to permit window function
1027  * implementations to support varying numbers of arguments. The real info
1028  * goes through the WindowObject, which is passed via fcinfo->context.
1029  */
1030  InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
1031  perfuncstate->numArguments,
1032  perfuncstate->winCollation,
1033  (void *) perfuncstate->winobj, NULL);
1034  /* Just in case, make all the regular argument slots be null */
1035  memset(fcinfo.argnull, true, perfuncstate->numArguments);
1036  /* Window functions don't have a current aggregate context, either */
1037  winstate->curaggcontext = NULL;
1038 
1039  *result = FunctionCallInvoke(&fcinfo);
1040  *isnull = fcinfo.isnull;
1041 
1042  /*
1043  * Make sure pass-by-ref data is allocated in the appropriate context. (We
1044  * need this in case the function returns a pointer into some short-lived
1045  * tuple, as is entirely possible.)
1046  */
1047  if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1049  DatumGetPointer(*result)))
1050  *result = datumCopy(*result,
1051  perfuncstate->resulttypeByVal,
1052  perfuncstate->resulttypeLen);
1053 
1054  MemoryContextSwitchTo(oldContext);
1055 }
1056 
1057 /*
1058  * begin_partition
1059  * Start buffering rows of the next partition.
1060  */
1061 static void
1063 {
1064  PlanState *outerPlan = outerPlanState(winstate);
1065  int numfuncs = winstate->numfuncs;
1066  int i;
1067 
1068  winstate->partition_spooled = false;
1069  winstate->framehead_valid = false;
1070  winstate->frametail_valid = false;
1071  winstate->spooled_rows = 0;
1072  winstate->currentpos = 0;
1073  winstate->frameheadpos = 0;
1074  winstate->frametailpos = -1;
1075  ExecClearTuple(winstate->agg_row_slot);
1076 
1077  /*
1078  * If this is the very first partition, we need to fetch the first input
1079  * row to store in first_part_slot.
1080  */
1081  if (TupIsNull(winstate->first_part_slot))
1082  {
1083  TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1084 
1085  if (!TupIsNull(outerslot))
1086  ExecCopySlot(winstate->first_part_slot, outerslot);
1087  else
1088  {
1089  /* outer plan is empty, so we have nothing to do */
1090  winstate->partition_spooled = true;
1091  winstate->more_partitions = false;
1092  return;
1093  }
1094  }
1095 
1096  /* Create new tuplestore for this partition */
1097  winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1098 
1099  /*
1100  * Set up read pointers for the tuplestore. The current pointer doesn't
1101  * need BACKWARD capability, but the per-window-function read pointers do,
1102  * and the aggregate pointer does if frame start is movable.
1103  */
1104  winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1105 
1106  /* reset default REWIND capability bit for current ptr */
1107  tuplestore_set_eflags(winstate->buffer, 0);
1108 
1109  /* create read pointers for aggregates, if needed */
1110  if (winstate->numaggs > 0)
1111  {
1112  WindowObject agg_winobj = winstate->agg_winobj;
1113  int readptr_flags = 0;
1114 
1115  /* If the frame head is potentially movable ... */
1117  {
1118  /* ... create a mark pointer to track the frame head */
1119  agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1120  /* and the read pointer will need BACKWARD capability */
1121  readptr_flags |= EXEC_FLAG_BACKWARD;
1122  }
1123 
1124  agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1125  readptr_flags);
1126  agg_winobj->markpos = -1;
1127  agg_winobj->seekpos = -1;
1128 
1129  /* Also reset the row counters for aggregates */
1130  winstate->aggregatedbase = 0;
1131  winstate->aggregatedupto = 0;
1132  }
1133 
1134  /* create mark and read pointers for each real window function */
1135  for (i = 0; i < numfuncs; i++)
1136  {
1137  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1138 
1139  if (!perfuncstate->plain_agg)
1140  {
1141  WindowObject winobj = perfuncstate->winobj;
1142 
1143  winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1144  0);
1145  winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1147  winobj->markpos = -1;
1148  winobj->seekpos = -1;
1149  }
1150  }
1151 
1152  /*
1153  * Store the first tuple into the tuplestore (it's always available now;
1154  * we either read it above, or saved it at the end of previous partition)
1155  */
1156  tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1157  winstate->spooled_rows++;
1158 }
1159 
1160 /*
1161  * Read tuples from the outer node, up to and including position 'pos', and
1162  * store them into the tuplestore. If pos is -1, reads the whole partition.
1163  */
1164 static void
1166 {
1167  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1169  TupleTableSlot *outerslot;
1170  MemoryContext oldcontext;
1171 
1172  if (!winstate->buffer)
1173  return; /* just a safety check */
1174  if (winstate->partition_spooled)
1175  return; /* whole partition done already */
1176 
1177  /*
1178  * If the tuplestore has spilled to disk, alternate reading and writing
1179  * becomes quite expensive due to frequent buffer flushes. It's cheaper
1180  * to force the entire partition to get spooled in one go.
1181  *
1182  * XXX this is a horrid kluge --- it'd be better to fix the performance
1183  * problem inside tuplestore. FIXME
1184  */
1185  if (!tuplestore_in_memory(winstate->buffer))
1186  pos = -1;
1187 
1188  outerPlan = outerPlanState(winstate);
1189 
1190  /* Must be in query context to call outerplan */
1192 
1193  while (winstate->spooled_rows <= pos || pos == -1)
1194  {
1195  outerslot = ExecProcNode(outerPlan);
1196  if (TupIsNull(outerslot))
1197  {
1198  /* reached the end of the last partition */
1199  winstate->partition_spooled = true;
1200  winstate->more_partitions = false;
1201  break;
1202  }
1203 
1204  if (node->partNumCols > 0)
1205  {
1206  /* Check if this tuple still belongs to the current partition */
1207  if (!execTuplesMatch(winstate->first_part_slot,
1208  outerslot,
1209  node->partNumCols, node->partColIdx,
1210  winstate->partEqfunctions,
1211  winstate->tmpcontext->ecxt_per_tuple_memory))
1212  {
1213  /*
1214  * end of partition; copy the tuple for the next cycle.
1215  */
1216  ExecCopySlot(winstate->first_part_slot, outerslot);
1217  winstate->partition_spooled = true;
1218  winstate->more_partitions = true;
1219  break;
1220  }
1221  }
1222 
1223  /* Still in partition, so save it into the tuplestore */
1224  tuplestore_puttupleslot(winstate->buffer, outerslot);
1225  winstate->spooled_rows++;
1226  }
1227 
1228  MemoryContextSwitchTo(oldcontext);
1229 }
1230 
1231 /*
1232  * release_partition
1233  * clear information kept within a partition, including
1234  * tuplestore and aggregate results.
1235  */
1236 static void
1238 {
1239  int i;
1240 
1241  for (i = 0; i < winstate->numfuncs; i++)
1242  {
1243  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1244 
1245  /* Release any partition-local state of this window function */
1246  if (perfuncstate->winobj)
1247  perfuncstate->winobj->localmem = NULL;
1248  }
1249 
1250  /*
1251  * Release all partition-local memory (in particular, any partition-local
1252  * state that we might have trashed our pointers to in the above loop, and
1253  * any aggregate temp data). We don't rely on retail pfree because some
1254  * aggregates might have allocated data we don't have direct pointers to.
1255  */
1258  for (i = 0; i < winstate->numaggs; i++)
1259  {
1260  if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1262  }
1263 
1264  if (winstate->buffer)
1265  tuplestore_end(winstate->buffer);
1266  winstate->buffer = NULL;
1267  winstate->partition_spooled = false;
1268 }
1269 
1270 /*
1271  * row_is_in_frame
1272  * Determine whether a row is in the current row's window frame according
1273  * to our window framing rule
1274  *
1275  * The caller must have already determined that the row is in the partition
1276  * and fetched it into a slot. This function just encapsulates the framing
1277  * rules.
1278  */
1279 static bool
1281 {
1282  int frameOptions = winstate->frameOptions;
1283 
1284  Assert(pos >= 0); /* else caller error */
1285 
1286  /* First, check frame starting conditions */
1287  if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1288  {
1289  if (frameOptions & FRAMEOPTION_ROWS)
1290  {
1291  /* rows before current row are out of frame */
1292  if (pos < winstate->currentpos)
1293  return false;
1294  }
1295  else if (frameOptions & FRAMEOPTION_RANGE)
1296  {
1297  /* preceding row that is not peer is out of frame */
1298  if (pos < winstate->currentpos &&
1299  !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1300  return false;
1301  }
1302  else
1303  Assert(false);
1304  }
1305  else if (frameOptions & FRAMEOPTION_START_VALUE)
1306  {
1307  if (frameOptions & FRAMEOPTION_ROWS)
1308  {
1309  int64 offset = DatumGetInt64(winstate->startOffsetValue);
1310 
1311  /* rows before current row + offset are out of frame */
1312  if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1313  offset = -offset;
1314 
1315  if (pos < winstate->currentpos + offset)
1316  return false;
1317  }
1318  else if (frameOptions & FRAMEOPTION_RANGE)
1319  {
1320  /* parser should have rejected this */
1321  elog(ERROR, "window frame with value offset is not implemented");
1322  }
1323  else
1324  Assert(false);
1325  }
1326 
1327  /* Okay so far, now check frame ending conditions */
1328  if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1329  {
1330  if (frameOptions & FRAMEOPTION_ROWS)
1331  {
1332  /* rows after current row are out of frame */
1333  if (pos > winstate->currentpos)
1334  return false;
1335  }
1336  else if (frameOptions & FRAMEOPTION_RANGE)
1337  {
1338  /* following row that is not peer is out of frame */
1339  if (pos > winstate->currentpos &&
1340  !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1341  return false;
1342  }
1343  else
1344  Assert(false);
1345  }
1346  else if (frameOptions & FRAMEOPTION_END_VALUE)
1347  {
1348  if (frameOptions & FRAMEOPTION_ROWS)
1349  {
1350  int64 offset = DatumGetInt64(winstate->endOffsetValue);
1351 
1352  /* rows after current row + offset are out of frame */
1353  if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1354  offset = -offset;
1355 
1356  if (pos > winstate->currentpos + offset)
1357  return false;
1358  }
1359  else if (frameOptions & FRAMEOPTION_RANGE)
1360  {
1361  /* parser should have rejected this */
1362  elog(ERROR, "window frame with value offset is not implemented");
1363  }
1364  else
1365  Assert(false);
1366  }
1367 
1368  /* If we get here, it's in frame */
1369  return true;
1370 }
1371 
1372 /*
1373  * update_frameheadpos
1374  * make frameheadpos valid for the current row
1375  *
1376  * Uses the winobj's read pointer for any required fetches; hence, if the
1377  * frame mode is one that requires row comparisons, the winobj's mark must
1378  * not be past the currently known frame head. Also uses the specified slot
1379  * for any required fetches.
1380  */
1381 static void
1383 {
1384  WindowAggState *winstate = winobj->winstate;
1385  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1386  int frameOptions = winstate->frameOptions;
1387 
1388  if (winstate->framehead_valid)
1389  return; /* already known for current row */
1390 
1391  if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1392  {
1393  /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1394  winstate->frameheadpos = 0;
1395  winstate->framehead_valid = true;
1396  }
1397  else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1398  {
1399  if (frameOptions & FRAMEOPTION_ROWS)
1400  {
1401  /* In ROWS mode, frame head is the same as current */
1402  winstate->frameheadpos = winstate->currentpos;
1403  winstate->framehead_valid = true;
1404  }
1405  else if (frameOptions & FRAMEOPTION_RANGE)
1406  {
1407  int64 fhprev;
1408 
1409  /* If no ORDER BY, all rows are peers with each other */
1410  if (node->ordNumCols == 0)
1411  {
1412  winstate->frameheadpos = 0;
1413  winstate->framehead_valid = true;
1414  return;
1415  }
1416 
1417  /*
1418  * In RANGE START_CURRENT mode, frame head is the first row that
1419  * is a peer of current row. We search backwards from current,
1420  * which could be a bit inefficient if peer sets are large. Might
1421  * be better to have a separate read pointer that moves forward
1422  * tracking the frame head.
1423  */
1424  fhprev = winstate->currentpos - 1;
1425  for (;;)
1426  {
1427  /* assume the frame head can't go backwards */
1428  if (fhprev < winstate->frameheadpos)
1429  break;
1430  if (!window_gettupleslot(winobj, fhprev, slot))
1431  break; /* start of partition */
1432  if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1433  break; /* not peer of current row */
1434  fhprev--;
1435  }
1436  winstate->frameheadpos = fhprev + 1;
1437  winstate->framehead_valid = true;
1438  }
1439  else
1440  Assert(false);
1441  }
1442  else if (frameOptions & FRAMEOPTION_START_VALUE)
1443  {
1444  if (frameOptions & FRAMEOPTION_ROWS)
1445  {
1446  /* In ROWS mode, bound is physically n before/after current */
1447  int64 offset = DatumGetInt64(winstate->startOffsetValue);
1448 
1449  if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1450  offset = -offset;
1451 
1452  winstate->frameheadpos = winstate->currentpos + offset;
1453  /* frame head can't go before first row */
1454  if (winstate->frameheadpos < 0)
1455  winstate->frameheadpos = 0;
1456  else if (winstate->frameheadpos > winstate->currentpos)
1457  {
1458  /* make sure frameheadpos is not past end of partition */
1459  spool_tuples(winstate, winstate->frameheadpos - 1);
1460  if (winstate->frameheadpos > winstate->spooled_rows)
1461  winstate->frameheadpos = winstate->spooled_rows;
1462  }
1463  winstate->framehead_valid = true;
1464  }
1465  else if (frameOptions & FRAMEOPTION_RANGE)
1466  {
1467  /* parser should have rejected this */
1468  elog(ERROR, "window frame with value offset is not implemented");
1469  }
1470  else
1471  Assert(false);
1472  }
1473  else
1474  Assert(false);
1475 }
1476 
1477 /*
1478  * update_frametailpos
1479  * make frametailpos valid for the current row
1480  *
1481  * Uses the winobj's read pointer for any required fetches; hence, if the
1482  * frame mode is one that requires row comparisons, the winobj's mark must
1483  * not be past the currently known frame tail. Also uses the specified slot
1484  * for any required fetches.
1485  */
1486 static void
1488 {
1489  WindowAggState *winstate = winobj->winstate;
1490  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1491  int frameOptions = winstate->frameOptions;
1492 
1493  if (winstate->frametail_valid)
1494  return; /* already known for current row */
1495 
1496  if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1497  {
1498  /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1499  spool_tuples(winstate, -1);
1500  winstate->frametailpos = winstate->spooled_rows - 1;
1501  winstate->frametail_valid = true;
1502  }
1503  else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1504  {
1505  if (frameOptions & FRAMEOPTION_ROWS)
1506  {
1507  /* In ROWS mode, exactly the rows up to current are in frame */
1508  winstate->frametailpos = winstate->currentpos;
1509  winstate->frametail_valid = true;
1510  }
1511  else if (frameOptions & FRAMEOPTION_RANGE)
1512  {
1513  int64 ftnext;
1514 
1515  /* If no ORDER BY, all rows are peers with each other */
1516  if (node->ordNumCols == 0)
1517  {
1518  spool_tuples(winstate, -1);
1519  winstate->frametailpos = winstate->spooled_rows - 1;
1520  winstate->frametail_valid = true;
1521  return;
1522  }
1523 
1524  /*
1525  * Else we have to search for the first non-peer of the current
1526  * row. We assume the current value of frametailpos is a lower
1527  * bound on the possible frame tail location, ie, frame tail never
1528  * goes backward, and that currentpos is also a lower bound, ie,
1529  * frame end always >= current row.
1530  */
1531  ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1532  for (;;)
1533  {
1534  if (!window_gettupleslot(winobj, ftnext, slot))
1535  break; /* end of partition */
1536  if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1537  break; /* not peer of current row */
1538  ftnext++;
1539  }
1540  winstate->frametailpos = ftnext - 1;
1541  winstate->frametail_valid = true;
1542  }
1543  else
1544  Assert(false);
1545  }
1546  else if (frameOptions & FRAMEOPTION_END_VALUE)
1547  {
1548  if (frameOptions & FRAMEOPTION_ROWS)
1549  {
1550  /* In ROWS mode, bound is physically n before/after current */
1551  int64 offset = DatumGetInt64(winstate->endOffsetValue);
1552 
1553  if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1554  offset = -offset;
1555 
1556  winstate->frametailpos = winstate->currentpos + offset;
1557  /* smallest allowable value of frametailpos is -1 */
1558  if (winstate->frametailpos < 0)
1559  winstate->frametailpos = -1;
1560  else if (winstate->frametailpos > winstate->currentpos)
1561  {
1562  /* make sure frametailpos is not past last row of partition */
1563  spool_tuples(winstate, winstate->frametailpos);
1564  if (winstate->frametailpos >= winstate->spooled_rows)
1565  winstate->frametailpos = winstate->spooled_rows - 1;
1566  }
1567  winstate->frametail_valid = true;
1568  }
1569  else if (frameOptions & FRAMEOPTION_RANGE)
1570  {
1571  /* parser should have rejected this */
1572  elog(ERROR, "window frame with value offset is not implemented");
1573  }
1574  else
1575  Assert(false);
1576  }
1577  else
1578  Assert(false);
1579 }
1580 
1581 
1582 /* -----------------
1583  * ExecWindowAgg
1584  *
1585  * ExecWindowAgg receives tuples from its outer subplan and
1586  * stores them into a tuplestore, then processes window functions.
1587  * This node doesn't reduce nor qualify any row so the number of
1588  * returned rows is exactly the same as its outer subplan's result.
1589  * -----------------
1590  */
1591 static TupleTableSlot *
1593 {
1595  ExprContext *econtext;
1596  int i;
1597  int numfuncs;
1598 
1600 
1601  if (winstate->all_done)
1602  return NULL;
1603 
1604  /*
1605  * Compute frame offset values, if any, during first call.
1606  */
1607  if (winstate->all_first)
1608  {
1609  int frameOptions = winstate->frameOptions;
1610  ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1611  Datum value;
1612  bool isnull;
1613  int16 len;
1614  bool byval;
1615 
1616  if (frameOptions & FRAMEOPTION_START_VALUE)
1617  {
1618  Assert(winstate->startOffset != NULL);
1619  value = ExecEvalExprSwitchContext(winstate->startOffset,
1620  econtext,
1621  &isnull);
1622  if (isnull)
1623  ereport(ERROR,
1624  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1625  errmsg("frame starting offset must not be null")));
1626  /* copy value into query-lifespan context */
1627  get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1628  &len, &byval);
1629  winstate->startOffsetValue = datumCopy(value, byval, len);
1630  if (frameOptions & FRAMEOPTION_ROWS)
1631  {
1632  /* value is known to be int8 */
1633  int64 offset = DatumGetInt64(value);
1634 
1635  if (offset < 0)
1636  ereport(ERROR,
1637  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1638  errmsg("frame starting offset must not be negative")));
1639  }
1640  }
1641  if (frameOptions & FRAMEOPTION_END_VALUE)
1642  {
1643  Assert(winstate->endOffset != NULL);
1644  value = ExecEvalExprSwitchContext(winstate->endOffset,
1645  econtext,
1646  &isnull);
1647  if (isnull)
1648  ereport(ERROR,
1649  (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1650  errmsg("frame ending offset must not be null")));
1651  /* copy value into query-lifespan context */
1652  get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1653  &len, &byval);
1654  winstate->endOffsetValue = datumCopy(value, byval, len);
1655  if (frameOptions & FRAMEOPTION_ROWS)
1656  {
1657  /* value is known to be int8 */
1658  int64 offset = DatumGetInt64(value);
1659 
1660  if (offset < 0)
1661  ereport(ERROR,
1662  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1663  errmsg("frame ending offset must not be negative")));
1664  }
1665  }
1666  winstate->all_first = false;
1667  }
1668 
1669  if (winstate->buffer == NULL)
1670  {
1671  /* Initialize for first partition and set current row = 0 */
1672  begin_partition(winstate);
1673  /* If there are no input rows, we'll detect that and exit below */
1674  }
1675  else
1676  {
1677  /* Advance current row within partition */
1678  winstate->currentpos++;
1679  /* This might mean that the frame moves, too */
1680  winstate->framehead_valid = false;
1681  winstate->frametail_valid = false;
1682  }
1683 
1684  /*
1685  * Spool all tuples up to and including the current row, if we haven't
1686  * already
1687  */
1688  spool_tuples(winstate, winstate->currentpos);
1689 
1690  /* Move to the next partition if we reached the end of this partition */
1691  if (winstate->partition_spooled &&
1692  winstate->currentpos >= winstate->spooled_rows)
1693  {
1694  release_partition(winstate);
1695 
1696  if (winstate->more_partitions)
1697  {
1698  begin_partition(winstate);
1699  Assert(winstate->spooled_rows > 0);
1700  }
1701  else
1702  {
1703  winstate->all_done = true;
1704  return NULL;
1705  }
1706  }
1707 
1708  /* final output execution is in ps_ExprContext */
1709  econtext = winstate->ss.ps.ps_ExprContext;
1710 
1711  /* Clear the per-output-tuple context for current row */
1712  ResetExprContext(econtext);
1713 
1714  /*
1715  * Read the current row from the tuplestore, and save in ScanTupleSlot.
1716  * (We can't rely on the outerplan's output slot because we may have to
1717  * read beyond the current row. Also, we have to actually copy the row
1718  * out of the tuplestore, since window function evaluation might cause the
1719  * tuplestore to dump its state to disk.)
1720  *
1721  * Current row must be in the tuplestore, since we spooled it above.
1722  */
1723  tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1724  if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1725  winstate->ss.ss_ScanTupleSlot))
1726  elog(ERROR, "unexpected end of tuplestore");
1727 
1728  /*
1729  * Evaluate true window functions
1730  */
1731  numfuncs = winstate->numfuncs;
1732  for (i = 0; i < numfuncs; i++)
1733  {
1734  WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1735 
1736  if (perfuncstate->plain_agg)
1737  continue;
1738  eval_windowfunction(winstate, perfuncstate,
1739  &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1740  &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1741  }
1742 
1743  /*
1744  * Evaluate aggregates
1745  */
1746  if (winstate->numaggs > 0)
1747  eval_windowaggregates(winstate);
1748 
1749  /*
1750  * Truncate any no-longer-needed rows from the tuplestore.
1751  */
1752  tuplestore_trim(winstate->buffer);
1753 
1754  /*
1755  * Form and return a projection tuple using the windowfunc results and the
1756  * current row. Setting ecxt_outertuple arranges that any Vars will be
1757  * evaluated with respect to that row.
1758  */
1759  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1760 
1761  return ExecProject(winstate->ss.ps.ps_ProjInfo);
1762 }
1763 
1764 /* -----------------
1765  * ExecInitWindowAgg
1766  *
1767  * Creates the run-time information for the WindowAgg node produced by the
1768  * planner and initializes its outer subtree
1769  * -----------------
1770  */
1772 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1773 {
1775  Plan *outerPlan;
1776  ExprContext *econtext;
1777  ExprContext *tmpcontext;
1778  WindowStatePerFunc perfunc;
1779  WindowStatePerAgg peragg;
1780  int numfuncs,
1781  wfuncno,
1782  numaggs,
1783  aggno;
1784  ListCell *l;
1785 
1786  /* check for unsupported flags */
1787  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1788 
1789  /*
1790  * create state structure
1791  */
1792  winstate = makeNode(WindowAggState);
1793  winstate->ss.ps.plan = (Plan *) node;
1794  winstate->ss.ps.state = estate;
1795  winstate->ss.ps.ExecProcNode = ExecWindowAgg;
1796 
1797  /*
1798  * Create expression contexts. We need two, one for per-input-tuple
1799  * processing and one for per-output-tuple processing. We cheat a little
1800  * by using ExecAssignExprContext() to build both.
1801  */
1802  ExecAssignExprContext(estate, &winstate->ss.ps);
1803  tmpcontext = winstate->ss.ps.ps_ExprContext;
1804  winstate->tmpcontext = tmpcontext;
1805  ExecAssignExprContext(estate, &winstate->ss.ps);
1806 
1807  /* Create long-lived context for storage of partition-local memory etc */
1808  winstate->partcontext =
1810  "WindowAgg Partition",
1812 
1813  /*
1814  * Create mid-lived context for aggregate trans values etc.
1815  *
1816  * Note that moving aggregates each use their own private context, not
1817  * this one.
1818  */
1819  winstate->aggcontext =
1821  "WindowAgg Aggregates",
1823 
1824  /*
1825  * tuple table initialization
1826  */
1827  ExecInitScanTupleSlot(estate, &winstate->ss);
1828  ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1829  winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1830  winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1831  winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1832  winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1833 
1834  /*
1835  * WindowAgg nodes never have quals, since they can only occur at the
1836  * logical top level of a query (ie, after any WHERE or HAVING filters)
1837  */
1838  Assert(node->plan.qual == NIL);
1839  winstate->ss.ps.qual = NULL;
1840 
1841  /*
1842  * initialize child nodes
1843  */
1844  outerPlan = outerPlan(node);
1845  outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1846 
1847  /*
1848  * initialize source tuple type (which is also the tuple type that we'll
1849  * store in the tuplestore and use in all our working slots).
1850  */
1851  ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1852 
1861 
1862  /*
1863  * Initialize result tuple type and projection info.
1864  */
1865  ExecAssignResultTypeFromTL(&winstate->ss.ps);
1866  ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1867 
1868  /* Set up data for comparing tuples */
1869  if (node->partNumCols > 0)
1871  node->partOperators);
1872  if (node->ordNumCols > 0)
1874  node->ordOperators);
1875 
1876  /*
1877  * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1878  */
1879  numfuncs = winstate->numfuncs;
1880  numaggs = winstate->numaggs;
1881  econtext = winstate->ss.ps.ps_ExprContext;
1882  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1883  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1884 
1885  /*
1886  * allocate per-wfunc/per-agg state information.
1887  */
1888  perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1889  peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1890  winstate->perfunc = perfunc;
1891  winstate->peragg = peragg;
1892 
1893  wfuncno = -1;
1894  aggno = -1;
1895  foreach(l, winstate->funcs)
1896  {
1897  WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1898  WindowFunc *wfunc = wfuncstate->wfunc;
1899  WindowStatePerFunc perfuncstate;
1900  AclResult aclresult;
1901  int i;
1902 
1903  if (wfunc->winref != node->winref) /* planner screwed up? */
1904  elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1905  wfunc->winref, node->winref);
1906 
1907  /* Look for a previous duplicate window function */
1908  for (i = 0; i <= wfuncno; i++)
1909  {
1910  if (equal(wfunc, perfunc[i].wfunc) &&
1911  !contain_volatile_functions((Node *) wfunc))
1912  break;
1913  }
1914  if (i <= wfuncno)
1915  {
1916  /* Found a match to an existing entry, so just mark it */
1917  wfuncstate->wfuncno = i;
1918  continue;
1919  }
1920 
1921  /* Nope, so assign a new PerAgg record */
1922  perfuncstate = &perfunc[++wfuncno];
1923 
1924  /* Mark WindowFunc state node with assigned index in the result array */
1925  wfuncstate->wfuncno = wfuncno;
1926 
1927  /* Check permission to call window function */
1928  aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1929  ACL_EXECUTE);
1930  if (aclresult != ACLCHECK_OK)
1931  aclcheck_error(aclresult, ACL_KIND_PROC,
1932  get_func_name(wfunc->winfnoid));
1934 
1935  /* Fill in the perfuncstate data */
1936  perfuncstate->wfuncstate = wfuncstate;
1937  perfuncstate->wfunc = wfunc;
1938  perfuncstate->numArguments = list_length(wfuncstate->args);
1939 
1940  fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1941  econtext->ecxt_per_query_memory);
1942  fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1943 
1944  perfuncstate->winCollation = wfunc->inputcollid;
1945 
1946  get_typlenbyval(wfunc->wintype,
1947  &perfuncstate->resulttypeLen,
1948  &perfuncstate->resulttypeByVal);
1949 
1950  /*
1951  * If it's really just a plain aggregate function, we'll emulate the
1952  * Agg environment for it.
1953  */
1954  perfuncstate->plain_agg = wfunc->winagg;
1955  if (wfunc->winagg)
1956  {
1957  WindowStatePerAgg peraggstate;
1958 
1959  perfuncstate->aggno = ++aggno;
1960  peraggstate = &winstate->peragg[aggno];
1961  initialize_peragg(winstate, wfunc, peraggstate);
1962  peraggstate->wfuncno = wfuncno;
1963  }
1964  else
1965  {
1967 
1968  winobj->winstate = winstate;
1969  winobj->argstates = wfuncstate->args;
1970  winobj->localmem = NULL;
1971  perfuncstate->winobj = winobj;
1972  }
1973  }
1974 
1975  /* Update numfuncs, numaggs to match number of unique functions found */
1976  winstate->numfuncs = wfuncno + 1;
1977  winstate->numaggs = aggno + 1;
1978 
1979  /* Set up WindowObject for aggregates, if needed */
1980  if (winstate->numaggs > 0)
1981  {
1982  WindowObject agg_winobj = makeNode(WindowObjectData);
1983 
1984  agg_winobj->winstate = winstate;
1985  agg_winobj->argstates = NIL;
1986  agg_winobj->localmem = NULL;
1987  /* make sure markptr = -1 to invalidate. It may not get used */
1988  agg_winobj->markptr = -1;
1989  agg_winobj->readptr = -1;
1990  winstate->agg_winobj = agg_winobj;
1991  }
1992 
1993  /* copy frame options to state node for easy access */
1994  winstate->frameOptions = node->frameOptions;
1995 
1996  /* initialize frame bound offset expressions */
1997  winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
1998  (PlanState *) winstate);
1999  winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2000  (PlanState *) winstate);
2001 
2002  winstate->all_first = true;
2003  winstate->partition_spooled = false;
2004  winstate->more_partitions = false;
2005 
2006  return winstate;
2007 }
2008 
2009 /* -----------------
2010  * ExecEndWindowAgg
2011  * -----------------
2012  */
2013 void
2015 {
2017  int i;
2018 
2019  release_partition(node);
2020 
2024  ExecClearTuple(node->temp_slot_1);
2025  ExecClearTuple(node->temp_slot_2);
2026 
2027  /*
2028  * Free both the expr contexts.
2029  */
2030  ExecFreeExprContext(&node->ss.ps);
2031  node->ss.ps.ps_ExprContext = node->tmpcontext;
2032  ExecFreeExprContext(&node->ss.ps);
2033 
2034  for (i = 0; i < node->numaggs; i++)
2035  {
2036  if (node->peragg[i].aggcontext != node->aggcontext)
2038  }
2041 
2042  pfree(node->perfunc);
2043  pfree(node->peragg);
2044 
2045  outerPlan = outerPlanState(node);
2046  ExecEndNode(outerPlan);
2047 }
2048 
2049 /* -----------------
2050  * ExecReScanWindowAgg
2051  * -----------------
2052  */
2053 void
2055 {
2057  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2058 
2059  node->all_done = false;
2060  node->all_first = true;
2061 
2062  /* release tuplestore et al */
2063  release_partition(node);
2064 
2065  /* release all temp tuples, but especially first_part_slot */
2069  ExecClearTuple(node->temp_slot_1);
2070  ExecClearTuple(node->temp_slot_2);
2071 
2072  /* Forget current wfunc values */
2073  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2074  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2075 
2076  /*
2077  * if chgParam of subnode is not null then plan will be re-scanned by
2078  * first ExecProcNode.
2079  */
2080  if (outerPlan->chgParam == NULL)
2081  ExecReScan(outerPlan);
2082 }
2083 
2084 /*
2085  * initialize_peragg
2086  *
2087  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2088  */
2089 static WindowStatePerAggData *
2091  WindowStatePerAgg peraggstate)
2092 {
2093  Oid inputTypes[FUNC_MAX_ARGS];
2094  int numArguments;
2095  HeapTuple aggTuple;
2096  Form_pg_aggregate aggform;
2097  Oid aggtranstype;
2098  AttrNumber initvalAttNo;
2099  AclResult aclresult;
2100  bool use_ma_code;
2101  Oid transfn_oid,
2102  invtransfn_oid,
2103  finalfn_oid;
2104  bool finalextra;
2105  char finalmodify;
2106  Expr *transfnexpr,
2107  *invtransfnexpr,
2108  *finalfnexpr;
2109  Datum textInitVal;
2110  int i;
2111  ListCell *lc;
2112 
2113  numArguments = list_length(wfunc->args);
2114 
2115  i = 0;
2116  foreach(lc, wfunc->args)
2117  {
2118  inputTypes[i++] = exprType((Node *) lfirst(lc));
2119  }
2120 
2121  aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2122  if (!HeapTupleIsValid(aggTuple))
2123  elog(ERROR, "cache lookup failed for aggregate %u",
2124  wfunc->winfnoid);
2125  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2126 
2127  /*
2128  * Figure out whether we want to use the moving-aggregate implementation,
2129  * and collect the right set of fields from the pg_attribute entry.
2130  *
2131  * It's possible that an aggregate would supply a safe moving-aggregate
2132  * implementation and an unsafe normal one, in which case our hand is
2133  * forced. Otherwise, if the frame head can't move, we don't need
2134  * moving-aggregate code. Even if we'd like to use it, don't do so if the
2135  * aggregate's arguments (and FILTER clause if any) contain any calls to
2136  * volatile functions. Otherwise, the difference between restarting and
2137  * not restarting the aggregation would be user-visible.
2138  */
2139  if (!OidIsValid(aggform->aggminvtransfn))
2140  use_ma_code = false; /* sine qua non */
2141  else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2142  aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2143  use_ma_code = true; /* decision forced by safety */
2145  use_ma_code = false; /* non-moving frame head */
2146  else if (contain_volatile_functions((Node *) wfunc))
2147  use_ma_code = false; /* avoid possible behavioral change */
2148  else
2149  use_ma_code = true; /* yes, let's use it */
2150  if (use_ma_code)
2151  {
2152  peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2153  peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2154  peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2155  finalextra = aggform->aggmfinalextra;
2156  finalmodify = aggform->aggmfinalmodify;
2157  aggtranstype = aggform->aggmtranstype;
2158  initvalAttNo = Anum_pg_aggregate_aggminitval;
2159  }
2160  else
2161  {
2162  peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2163  peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2164  peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2165  finalextra = aggform->aggfinalextra;
2166  finalmodify = aggform->aggfinalmodify;
2167  aggtranstype = aggform->aggtranstype;
2168  initvalAttNo = Anum_pg_aggregate_agginitval;
2169  }
2170 
2171  /*
2172  * ExecInitWindowAgg already checked permission to call aggregate function
2173  * ... but we still need to check the component functions
2174  */
2175 
2176  /* Check that aggregate owner has permission to call component fns */
2177  {
2178  HeapTuple procTuple;
2179  Oid aggOwner;
2180 
2181  procTuple = SearchSysCache1(PROCOID,
2182  ObjectIdGetDatum(wfunc->winfnoid));
2183  if (!HeapTupleIsValid(procTuple))
2184  elog(ERROR, "cache lookup failed for function %u",
2185  wfunc->winfnoid);
2186  aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2187  ReleaseSysCache(procTuple);
2188 
2189  aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2190  ACL_EXECUTE);
2191  if (aclresult != ACLCHECK_OK)
2192  aclcheck_error(aclresult, ACL_KIND_PROC,
2193  get_func_name(transfn_oid));
2194  InvokeFunctionExecuteHook(transfn_oid);
2195 
2196  if (OidIsValid(invtransfn_oid))
2197  {
2198  aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2199  ACL_EXECUTE);
2200  if (aclresult != ACLCHECK_OK)
2201  aclcheck_error(aclresult, ACL_KIND_PROC,
2202  get_func_name(invtransfn_oid));
2203  InvokeFunctionExecuteHook(invtransfn_oid);
2204  }
2205 
2206  if (OidIsValid(finalfn_oid))
2207  {
2208  aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2209  ACL_EXECUTE);
2210  if (aclresult != ACLCHECK_OK)
2211  aclcheck_error(aclresult, ACL_KIND_PROC,
2212  get_func_name(finalfn_oid));
2213  InvokeFunctionExecuteHook(finalfn_oid);
2214  }
2215  }
2216 
2217  /*
2218  * If the selected finalfn isn't read-only, we can't run this aggregate as
2219  * a window function. This is a user-facing error, so we take a bit more
2220  * care with the error message than elsewhere in this function.
2221  */
2222  if (finalmodify != AGGMODIFY_READ_ONLY)
2223  ereport(ERROR,
2224  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2225  errmsg("aggregate function %s does not support use as a window function",
2226  format_procedure(wfunc->winfnoid))));
2227 
2228  /* Detect how many arguments to pass to the finalfn */
2229  if (finalextra)
2230  peraggstate->numFinalArgs = numArguments + 1;
2231  else
2232  peraggstate->numFinalArgs = 1;
2233 
2234  /* resolve actual type of transition state, if polymorphic */
2235  aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2236  aggtranstype,
2237  inputTypes,
2238  numArguments);
2239 
2240  /* build expression trees using actual argument & result types */
2241  build_aggregate_transfn_expr(inputTypes,
2242  numArguments,
2243  0, /* no ordered-set window functions yet */
2244  false, /* no variadic window functions yet */
2245  aggtranstype,
2246  wfunc->inputcollid,
2247  transfn_oid,
2248  invtransfn_oid,
2249  &transfnexpr,
2250  &invtransfnexpr);
2251 
2252  /* set up infrastructure for calling the transfn(s) and finalfn */
2253  fmgr_info(transfn_oid, &peraggstate->transfn);
2254  fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2255 
2256  if (OidIsValid(invtransfn_oid))
2257  {
2258  fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2259  fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2260  }
2261 
2262  if (OidIsValid(finalfn_oid))
2263  {
2264  build_aggregate_finalfn_expr(inputTypes,
2265  peraggstate->numFinalArgs,
2266  aggtranstype,
2267  wfunc->wintype,
2268  wfunc->inputcollid,
2269  finalfn_oid,
2270  &finalfnexpr);
2271  fmgr_info(finalfn_oid, &peraggstate->finalfn);
2272  fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2273  }
2274 
2275  /* get info about relevant datatypes */
2276  get_typlenbyval(wfunc->wintype,
2277  &peraggstate->resulttypeLen,
2278  &peraggstate->resulttypeByVal);
2279  get_typlenbyval(aggtranstype,
2280  &peraggstate->transtypeLen,
2281  &peraggstate->transtypeByVal);
2282 
2283  /*
2284  * initval is potentially null, so don't try to access it as a struct
2285  * field. Must do it the hard way with SysCacheGetAttr.
2286  */
2287  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2288  &peraggstate->initValueIsNull);
2289 
2290  if (peraggstate->initValueIsNull)
2291  peraggstate->initValue = (Datum) 0;
2292  else
2293  peraggstate->initValue = GetAggInitVal(textInitVal,
2294  aggtranstype);
2295 
2296  /*
2297  * If the transfn is strict and the initval is NULL, make sure input type
2298  * and transtype are the same (or at least binary-compatible), so that
2299  * it's OK to use the first input value as the initial transValue. This
2300  * should have been checked at agg definition time, but we must check
2301  * again in case the transfn's strictness property has been changed.
2302  */
2303  if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2304  {
2305  if (numArguments < 1 ||
2306  !IsBinaryCoercible(inputTypes[0], aggtranstype))
2307  ereport(ERROR,
2308  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2309  errmsg("aggregate %u needs to have compatible input type and transition type",
2310  wfunc->winfnoid)));
2311  }
2312 
2313  /*
2314  * Insist that forward and inverse transition functions have the same
2315  * strictness setting. Allowing them to differ would require handling
2316  * more special cases in advance_windowaggregate and
2317  * advance_windowaggregate_base, for no discernible benefit. This should
2318  * have been checked at agg definition time, but we must check again in
2319  * case either function's strictness property has been changed.
2320  */
2321  if (OidIsValid(invtransfn_oid) &&
2322  peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2323  ereport(ERROR,
2324  (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2325  errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2326 
2327  /*
2328  * Moving aggregates use their own aggcontext.
2329  *
2330  * This is necessary because they might restart at different times, so we
2331  * might never be able to reset the shared context otherwise. We can't
2332  * make it the aggregates' responsibility to clean up after themselves,
2333  * because strict aggregates must be restarted whenever we remove their
2334  * last non-NULL input, which the aggregate won't be aware is happening.
2335  * Also, just pfree()ing the transValue upon restarting wouldn't help,
2336  * since we'd miss any indirectly referenced data. We could, in theory,
2337  * make the memory allocation rules for moving aggregates different than
2338  * they have historically been for plain aggregates, but that seems grotty
2339  * and likely to lead to memory leaks.
2340  */
2341  if (OidIsValid(invtransfn_oid))
2342  peraggstate->aggcontext =
2344  "WindowAgg Per Aggregate",
2346  else
2347  peraggstate->aggcontext = winstate->aggcontext;
2348 
2349  ReleaseSysCache(aggTuple);
2350 
2351  return peraggstate;
2352 }
2353 
2354 static Datum
2355 GetAggInitVal(Datum textInitVal, Oid transtype)
2356 {
2357  Oid typinput,
2358  typioparam;
2359  char *strInitVal;
2360  Datum initVal;
2361 
2362  getTypeInputInfo(transtype, &typinput, &typioparam);
2363  strInitVal = TextDatumGetCString(textInitVal);
2364  initVal = OidInputFunctionCall(typinput, strInitVal,
2365  typioparam, -1);
2366  pfree(strInitVal);
2367  return initVal;
2368 }
2369 
2370 /*
2371  * are_peers
2372  * compare two rows to see if they are equal according to the ORDER BY clause
2373  *
2374  * NB: this does not consider the window frame mode.
2375  */
2376 static bool
2378  TupleTableSlot *slot2)
2379 {
2380  WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2381 
2382  /* If no ORDER BY, all rows are peers with each other */
2383  if (node->ordNumCols == 0)
2384  return true;
2385 
2386  return execTuplesMatch(slot1, slot2,
2387  node->ordNumCols, node->ordColIdx,
2388  winstate->ordEqfunctions,
2389  winstate->tmpcontext->ecxt_per_tuple_memory);
2390 }
2391 
2392 /*
2393  * window_gettupleslot
2394  * Fetch the pos'th tuple of the current partition into the slot,
2395  * using the winobj's read pointer
2396  *
2397  * Returns true if successful, false if no such row
2398  */
2399 static bool
2401 {
2402  WindowAggState *winstate = winobj->winstate;
2403  MemoryContext oldcontext;
2404 
2405  /* often called repeatedly in a row */
2407 
2408  /* Don't allow passing -1 to spool_tuples here */
2409  if (pos < 0)
2410  return false;
2411 
2412  /* If necessary, fetch the tuple into the spool */
2413  spool_tuples(winstate, pos);
2414 
2415  if (pos >= winstate->spooled_rows)
2416  return false;
2417 
2418  if (pos < winobj->markpos)
2419  elog(ERROR, "cannot fetch row before WindowObject's mark position");
2420 
2422 
2423  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2424 
2425  /*
2426  * Advance or rewind until we are within one tuple of the one we want.
2427  */
2428  if (winobj->seekpos < pos - 1)
2429  {
2430  if (!tuplestore_skiptuples(winstate->buffer,
2431  pos - 1 - winobj->seekpos,
2432  true))
2433  elog(ERROR, "unexpected end of tuplestore");
2434  winobj->seekpos = pos - 1;
2435  }
2436  else if (winobj->seekpos > pos + 1)
2437  {
2438  if (!tuplestore_skiptuples(winstate->buffer,
2439  winobj->seekpos - (pos + 1),
2440  false))
2441  elog(ERROR, "unexpected end of tuplestore");
2442  winobj->seekpos = pos + 1;
2443  }
2444  else if (winobj->seekpos == pos)
2445  {
2446  /*
2447  * There's no API to refetch the tuple at the current position. We
2448  * have to move one tuple forward, and then one backward. (We don't
2449  * do it the other way because we might try to fetch the row before
2450  * our mark, which isn't allowed.) XXX this case could stand to be
2451  * optimized.
2452  */
2453  tuplestore_advance(winstate->buffer, true);
2454  winobj->seekpos++;
2455  }
2456 
2457  /*
2458  * Now we should be on the tuple immediately before or after the one we
2459  * want, so just fetch forwards or backwards as appropriate.
2460  */
2461  if (winobj->seekpos > pos)
2462  {
2463  if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2464  elog(ERROR, "unexpected end of tuplestore");
2465  winobj->seekpos--;
2466  }
2467  else
2468  {
2469  if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2470  elog(ERROR, "unexpected end of tuplestore");
2471  winobj->seekpos++;
2472  }
2473 
2474  Assert(winobj->seekpos == pos);
2475 
2476  MemoryContextSwitchTo(oldcontext);
2477 
2478  return true;
2479 }
2480 
2481 
2482 /***********************************************************************
2483  * API exposed to window functions
2484  ***********************************************************************/
2485 
2486 
2487 /*
2488  * WinGetPartitionLocalMemory
2489  * Get working memory that lives till end of partition processing
2490  *
2491  * On first call within a given partition, this allocates and zeroes the
2492  * requested amount of space. Subsequent calls just return the same chunk.
2493  *
2494  * Memory obtained this way is normally used to hold state that should be
2495  * automatically reset for each new partition. If a window function wants
2496  * to hold state across the whole query, fcinfo->fn_extra can be used in the
2497  * usual way for that.
2498  */
2499 void *
2501 {
2502  Assert(WindowObjectIsValid(winobj));
2503  if (winobj->localmem == NULL)
2504  winobj->localmem =
2506  return winobj->localmem;
2507 }
2508 
2509 /*
2510  * WinGetCurrentPosition
2511  * Return the current row's position (counting from 0) within the current
2512  * partition.
2513  */
2514 int64
2516 {
2517  Assert(WindowObjectIsValid(winobj));
2518  return winobj->winstate->currentpos;
2519 }
2520 
2521 /*
2522  * WinGetPartitionRowCount
2523  * Return total number of rows contained in the current partition.
2524  *
2525  * Note: this is a relatively expensive operation because it forces the
2526  * whole partition to be "spooled" into the tuplestore at once. Once
2527  * executed, however, additional calls within the same partition are cheap.
2528  */
2529 int64
2531 {
2532  Assert(WindowObjectIsValid(winobj));
2533  spool_tuples(winobj->winstate, -1);
2534  return winobj->winstate->spooled_rows;
2535 }
2536 
2537 /*
2538  * WinSetMarkPosition
2539  * Set the "mark" position for the window object, which is the oldest row
2540  * number (counting from 0) it is allowed to fetch during all subsequent
2541  * operations within the current partition.
2542  *
2543  * Window functions do not have to call this, but are encouraged to move the
2544  * mark forward when possible to keep the tuplestore size down and prevent
2545  * having to spill rows to disk.
2546  */
2547 void
2549 {
2551 
2552  Assert(WindowObjectIsValid(winobj));
2553  winstate = winobj->winstate;
2554 
2555  if (markpos < winobj->markpos)
2556  elog(ERROR, "cannot move WindowObject's mark position backward");
2557  tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2558  if (markpos > winobj->markpos)
2559  {
2560  tuplestore_skiptuples(winstate->buffer,
2561  markpos - winobj->markpos,
2562  true);
2563  winobj->markpos = markpos;
2564  }
2565  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2566  if (markpos > winobj->seekpos)
2567  {
2568  tuplestore_skiptuples(winstate->buffer,
2569  markpos - winobj->seekpos,
2570  true);
2571  winobj->seekpos = markpos;
2572  }
2573 }
2574 
2575 /*
2576  * WinRowsArePeers
2577  * Compare two rows (specified by absolute position in window) to see
2578  * if they are equal according to the ORDER BY clause.
2579  *
2580  * NB: this does not consider the window frame mode.
2581  */
2582 bool
2583 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2584 {
2586  WindowAgg *node;
2587  TupleTableSlot *slot1;
2588  TupleTableSlot *slot2;
2589  bool res;
2590 
2591  Assert(WindowObjectIsValid(winobj));
2592  winstate = winobj->winstate;
2593  node = (WindowAgg *) winstate->ss.ps.plan;
2594 
2595  /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2596  if (node->ordNumCols == 0)
2597  return true;
2598 
2599  slot1 = winstate->temp_slot_1;
2600  slot2 = winstate->temp_slot_2;
2601 
2602  if (!window_gettupleslot(winobj, pos1, slot1))
2603  elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2604  pos1);
2605  if (!window_gettupleslot(winobj, pos2, slot2))
2606  elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2607  pos2);
2608 
2609  res = are_peers(winstate, slot1, slot2);
2610 
2611  ExecClearTuple(slot1);
2612  ExecClearTuple(slot2);
2613 
2614  return res;
2615 }
2616 
2617 /*
2618  * WinGetFuncArgInPartition
2619  * Evaluate a window function's argument expression on a specified
2620  * row of the partition. The row is identified in lseek(2) style,
2621  * i.e. relative to the current, first, or last row.
2622  *
2623  * argno: argument number to evaluate (counted from 0)
2624  * relpos: signed rowcount offset from the seek position
2625  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2626  * set_mark: If the row is found and set_mark is true, the mark is moved to
2627  * the row as a side-effect.
2628  * isnull: output argument, receives isnull status of result
2629  * isout: output argument, set to indicate whether target row position
2630  * is out of partition (can pass NULL if caller doesn't care about this)
2631  *
2632  * Specifying a nonexistent row is not an error, it just causes a null result
2633  * (plus setting *isout true, if isout isn't NULL).
2634  */
2635 Datum
2637  int relpos, int seektype, bool set_mark,
2638  bool *isnull, bool *isout)
2639 {
2641  ExprContext *econtext;
2642  TupleTableSlot *slot;
2643  bool gottuple;
2644  int64 abs_pos;
2645 
2646  Assert(WindowObjectIsValid(winobj));
2647  winstate = winobj->winstate;
2648  econtext = winstate->ss.ps.ps_ExprContext;
2649  slot = winstate->temp_slot_1;
2650 
2651  switch (seektype)
2652  {
2653  case WINDOW_SEEK_CURRENT:
2654  abs_pos = winstate->currentpos + relpos;
2655  break;
2656  case WINDOW_SEEK_HEAD:
2657  abs_pos = relpos;
2658  break;
2659  case WINDOW_SEEK_TAIL:
2660  spool_tuples(winstate, -1);
2661  abs_pos = winstate->spooled_rows - 1 + relpos;
2662  break;
2663  default:
2664  elog(ERROR, "unrecognized window seek type: %d", seektype);
2665  abs_pos = 0; /* keep compiler quiet */
2666  break;
2667  }
2668 
2669  gottuple = window_gettupleslot(winobj, abs_pos, slot);
2670 
2671  if (!gottuple)
2672  {
2673  if (isout)
2674  *isout = true;
2675  *isnull = true;
2676  return (Datum) 0;
2677  }
2678  else
2679  {
2680  if (isout)
2681  *isout = false;
2682  if (set_mark)
2683  {
2684  int frameOptions = winstate->frameOptions;
2685  int64 mark_pos = abs_pos;
2686 
2687  /*
2688  * In RANGE mode with a moving frame head, we must not let the
2689  * mark advance past frameheadpos, since that row has to be
2690  * fetchable during future update_frameheadpos calls.
2691  *
2692  * XXX it is very ugly to pollute window functions' marks with
2693  * this consideration; it could for instance mask a logic bug that
2694  * lets a window function fetch rows before what it had claimed
2695  * was its mark. Perhaps use a separate mark for frame head
2696  * probes?
2697  */
2698  if ((frameOptions & FRAMEOPTION_RANGE) &&
2699  !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2700  {
2701  update_frameheadpos(winobj, winstate->temp_slot_2);
2702  if (mark_pos > winstate->frameheadpos)
2703  mark_pos = winstate->frameheadpos;
2704  }
2705  WinSetMarkPosition(winobj, mark_pos);
2706  }
2707  econtext->ecxt_outertuple = slot;
2708  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2709  econtext, isnull);
2710  }
2711 }
2712 
2713 /*
2714  * WinGetFuncArgInFrame
2715  * Evaluate a window function's argument expression on a specified
2716  * row of the window frame. The row is identified in lseek(2) style,
2717  * i.e. relative to the current, first, or last row.
2718  *
2719  * argno: argument number to evaluate (counted from 0)
2720  * relpos: signed rowcount offset from the seek position
2721  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2722  * set_mark: If the row is found and set_mark is true, the mark is moved to
2723  * the row as a side-effect.
2724  * isnull: output argument, receives isnull status of result
2725  * isout: output argument, set to indicate whether target row position
2726  * is out of frame (can pass NULL if caller doesn't care about this)
2727  *
2728  * Specifying a nonexistent row is not an error, it just causes a null result
2729  * (plus setting *isout true, if isout isn't NULL).
2730  */
2731 Datum
2733  int relpos, int seektype, bool set_mark,
2734  bool *isnull, bool *isout)
2735 {
2737  ExprContext *econtext;
2738  TupleTableSlot *slot;
2739  bool gottuple;
2740  int64 abs_pos;
2741 
2742  Assert(WindowObjectIsValid(winobj));
2743  winstate = winobj->winstate;
2744  econtext = winstate->ss.ps.ps_ExprContext;
2745  slot = winstate->temp_slot_1;
2746 
2747  switch (seektype)
2748  {
2749  case WINDOW_SEEK_CURRENT:
2750  abs_pos = winstate->currentpos + relpos;
2751  break;
2752  case WINDOW_SEEK_HEAD:
2753  update_frameheadpos(winobj, slot);
2754  abs_pos = winstate->frameheadpos + relpos;
2755  break;
2756  case WINDOW_SEEK_TAIL:
2757  update_frametailpos(winobj, slot);
2758  abs_pos = winstate->frametailpos + relpos;
2759  break;
2760  default:
2761  elog(ERROR, "unrecognized window seek type: %d", seektype);
2762  abs_pos = 0; /* keep compiler quiet */
2763  break;
2764  }
2765 
2766  gottuple = window_gettupleslot(winobj, abs_pos, slot);
2767  if (gottuple)
2768  gottuple = row_is_in_frame(winstate, abs_pos, slot);
2769 
2770  if (!gottuple)
2771  {
2772  if (isout)
2773  *isout = true;
2774  *isnull = true;
2775  return (Datum) 0;
2776  }
2777  else
2778  {
2779  if (isout)
2780  *isout = false;
2781  if (set_mark)
2782  {
2783  int frameOptions = winstate->frameOptions;
2784  int64 mark_pos = abs_pos;
2785 
2786  /*
2787  * In RANGE mode with a moving frame head, we must not let the
2788  * mark advance past frameheadpos, since that row has to be
2789  * fetchable during future update_frameheadpos calls.
2790  *
2791  * XXX it is very ugly to pollute window functions' marks with
2792  * this consideration; it could for instance mask a logic bug that
2793  * lets a window function fetch rows before what it had claimed
2794  * was its mark. Perhaps use a separate mark for frame head
2795  * probes?
2796  */
2797  if ((frameOptions & FRAMEOPTION_RANGE) &&
2798  !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2799  {
2800  update_frameheadpos(winobj, winstate->temp_slot_2);
2801  if (mark_pos > winstate->frameheadpos)
2802  mark_pos = winstate->frameheadpos;
2803  }
2804  WinSetMarkPosition(winobj, mark_pos);
2805  }
2806  econtext->ecxt_outertuple = slot;
2807  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2808  econtext, isnull);
2809  }
2810 }
2811 
2812 /*
2813  * WinGetFuncArgCurrent
2814  * Evaluate a window function's argument expression on the current row.
2815  *
2816  * argno: argument number to evaluate (counted from 0)
2817  * isnull: output argument, receives isnull status of result
2818  *
2819  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2820  * WinGetFuncArgInFrame targeting the current row, because it will succeed
2821  * even if the WindowObject's mark has been set beyond the current row.
2822  * This should generally be used for "ordinary" arguments of a window
2823  * function, such as the offset argument of lead() or lag().
2824  */
2825 Datum
2826 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2827 {
2829  ExprContext *econtext;
2830 
2831  Assert(WindowObjectIsValid(winobj));
2832  winstate = winobj->winstate;
2833 
2834  econtext = winstate->ss.ps.ps_ExprContext;
2835 
2836  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2837  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2838  econtext, isnull);
2839 }
signed short int16
Definition: c.h:293
int ordNumCols
Definition: plannodes.h:807
void tuplestore_puttupleslot(Tuplestorestate *state, TupleTableSlot *slot)
Definition: tuplestore.c:708
#define NIL
Definition: pg_list.h:69
Datum WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
Definition: fmgr.h:56
List * qual
Definition: plannodes.h:145
bool WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
MemoryContext curaggcontext
Definition: execnodes.h:1897
ExprState * endOffset
Definition: execnodes.h:1891
#define AGGMODIFY_READ_ONLY
Definition: pg_aggregate.h:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
void * WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
Datum * ecxt_aggvalues
Definition: execnodes.h:214
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:292
bool tuplestore_advance(Tuplestorestate *state, bool forward)
Definition: tuplestore.c:1110
struct WindowStatePerAggData * WindowStatePerAgg
Definition: execnodes.h:1863
int64 WinGetPartitionRowCount(WindowObject winobj)
Datum startOffsetValue
Definition: execnodes.h:1892
static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, Datum *result, bool *isnull)
void ExecInitScanTupleSlot(EState *estate, ScanState *scanstate)
Definition: execTuples.c:842
ExprState * aggfilter
Definition: execnodes.h:673
#define GETSTRUCT(TUP)
Definition: htup_details.h:661
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:885
#define Anum_pg_aggregate_agginitval
Definition: pg_aggregate.h:117
static bool window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
List * args
Definition: primnodes.h:359
MemoryContext MemoryContextGetParent(MemoryContext context)
Definition: mcxt.c:402
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:2984
Oid GetUserId(void)
Definition: miscinit.c:284
#define castNode(_type_, nodeptr)
Definition: nodes.h:581
void tuplestore_trim(Tuplestorestate *state)
Definition: tuplestore.c:1360
ScanState ss
Definition: execnodes.h:1867
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:539
void tuplestore_set_eflags(Tuplestorestate *state, int eflags)
Definition: tuplestore.c:359
ExprContext * ps_ExprContext
Definition: execnodes.h:884
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:204
#define FRAMEOPTION_START_VALUE
Definition: parsenodes.h:519
AttrNumber * ordColIdx
Definition: plannodes.h:808
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:512
bool execTuplesMatch(TupleTableSlot *slot1, TupleTableSlot *slot2, int numCols, AttrNumber *matchColIdx, FmgrInfo *eqfunctions, MemoryContext evalContext)
Definition: execGrouping.c:69
int64 aggregatedupto
Definition: execnodes.h:1887
int errcode(int sqlerrcode)
Definition: elog.c:575
#define MemSet(start, val, len)
Definition: c.h:863
bool frametail_valid
Definition: execnodes.h:1908
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1116
static void release_partition(WindowAggState *winstate)
struct WindowStatePerFuncData * WindowStatePerFunc
Definition: execnodes.h:1862
FmgrInfo * partEqfunctions
Definition: execnodes.h:1876
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
Definition: parse_agg.c:2030
WindowFuncExprState * wfuncstate
Definition: nodeWindowAgg.c:79
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:957
EState * state
Definition: execnodes.h:852
unsigned int Oid
Definition: postgres_ext.h:31
NodeTag
Definition: nodes.h:26
Index winref
Definition: primnodes.h:361
#define FRAMEOPTION_START_UNBOUNDED_PRECEDING
Definition: parsenodes.h:508
TupleTableSlot * temp_slot_1
Definition: execnodes.h:1916
#define OidIsValid(objectId)
Definition: c.h:586
WindowStatePerFunc perfunc
Definition: execnodes.h:1874
void ExecFreeExprContext(PlanState *planstate)
Definition: execUtils.c:603
Oid * ordOperators
Definition: plannodes.h:809
static void spool_tuples(WindowAggState *winstate, int64 pos)
void ExecAssignResultTypeFromTL(PlanState *planstate)
Definition: execUtils.c:448
#define WINDOW_SEEK_TAIL
Definition: windowapi.h:34
TupleTableSlot * first_part_slot
Definition: execnodes.h:1911
static bool advance_windowaggregate_base(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
#define FUNC_MAX_ARGS
ExprContext * tmpcontext
Definition: execnodes.h:1898
PlanState ps
Definition: execnodes.h:1113
struct WindowObjectData * agg_winobj
Definition: execnodes.h:1885
Node * startOffset
Definition: plannodes.h:811
bool tuplestore_in_memory(Tuplestorestate *state)
Definition: tuplestore.c:1455
int64 frameheadpos
Definition: execnodes.h:1882
FmgrInfo * flinfo
Definition: fmgr.h:79
static WindowStatePerAggData * initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, WindowStatePerAgg peraggstate)
void pfree(void *pointer)
Definition: mcxt.c:936
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
bool fn_strict
Definition: fmgr.h:61
Expr * expr
Definition: execnodes.h:84
char * get_func_name(Oid funcid)
Definition: lsyscache.c:1412
void ExecInitResultTupleSlot(EState *estate, PlanState *planstate)
Definition: execTuples.c:832
struct WindowObjectData WindowObjectData
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:122
static bool row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
static struct @121 value
#define DatumGetInt64(X)
Definition: postgres.h:613
struct WindowStatePerFuncData WindowStatePerFuncData
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define EXEC_FLAG_BACKWARD
Definition: executor.h:60
#define outerPlanState(node)
Definition: execnodes.h:896
TupleTableSlot * temp_slot_2
Definition: execnodes.h:1917
Datum WinGetFuncArgInPartition(WindowObject winobj, int argno, int relpos, int seektype, bool set_mark, bool *isnull, bool *isout)
Datum endOffsetValue
Definition: execnodes.h:1893
void * list_nth(const List *list, int n)
Definition: list.c:410
WindowStatePerAgg peragg
Definition: execnodes.h:1875
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:137
ExprState * startOffset
Definition: execnodes.h:1890
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
Definition: execUtils.c:495
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:277
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3457
int64 aggregatedbase
Definition: execnodes.h:1886
Node * endOffset
Definition: plannodes.h:812
#define FRAMEOPTION_END_CURRENT_ROW
Definition: parsenodes.h:513
#define fmgr_info_set_expr(expr, finfo)
Definition: fmgr.h:104
#define FRAMEOPTION_START_VALUE_PRECEDING
Definition: parsenodes.h:514
#define DatumGetBool(X)
Definition: postgres.h:399
#define MakeExpandedObjectReadOnly(d, isnull, typlen)
#define FRAMEOPTION_END_UNBOUNDED_FOLLOWING
Definition: parsenodes.h:511
#define TupIsNull(slot)
Definition: tuptable.h:138
int partNumCols
Definition: plannodes.h:804
Oid winfnoid
Definition: primnodes.h:355
bool argnull[FUNC_MAX_ARGS]
Definition: fmgr.h:86
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
Tuplestorestate * buffer
Definition: execnodes.h:1878
MemoryContext aggcontext
Definition: execnodes.h:1896
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
Definition: fmgr.c:132
#define FRAMEOPTION_START_CURRENT_ROW
Definition: parsenodes.h:512
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2632
#define ereport(elevel, rest)
Definition: elog.h:122
Datum datumCopy(Datum value, bool typByVal, int typLen)
Definition: datum.c:128
Bitmapset * chgParam
Definition: execnodes.h:878
#define InvokeFunctionExecuteHook(objectId)
Definition: objectaccess.h:179
bool IsBinaryCoercible(Oid srctype, Oid targettype)
#define outerPlan(node)
Definition: plannodes.h:174
int64 WinGetCurrentPosition(WindowObject winobj)
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
ExpandedObjectHeader * DatumGetEOHP(Datum d)
Definition: expandeddatum.c:29
static void eval_windowaggregates(WindowAggState *winstate)
int64 spooled_rows
Definition: execnodes.h:1880
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
#define WINDOW_SEEK_HEAD
Definition: windowapi.h:33
bool * ecxt_aggnulls
Definition: execnodes.h:215
#define TextDatumGetCString(d)
Definition: builtins.h:92
static TupleTableSlot * ExecWindowAgg(PlanState *pstate)
#define Anum_pg_aggregate_aggminitval
Definition: pg_aggregate.h:118
WindowAggState * winstate
Definition: nodeWindowAgg.c:63
void * palloc0(Size size)
Definition: mcxt.c:864
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:856
AclResult
Definition: acl.h:178
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:237
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1368
int work_mem
Definition: globals.c:113
TupleTableSlot * agg_row_slot
Definition: execnodes.h:1915
static void initialize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
AttrNumber * partColIdx
Definition: plannodes.h:805
FormData_pg_proc * Form_pg_proc
Definition: pg_proc.h:83
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:728
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
Plan * plan
Definition: execnodes.h:850
static void begin_partition(WindowAggState *winstate)
void DeleteExpandedObject(Datum d)
#define InvalidOid
Definition: postgres_ext.h:36
bool more_partitions
Definition: execnodes.h:1904
char * format_procedure(Oid procedure_oid)
Definition: regproc.c:323
Datum arg[FUNC_MAX_ARGS]
Definition: fmgr.h:85
#define WindowObjectIsValid(winobj)
Definition: windowapi.h:41
FmgrInfo * ordEqfunctions
Definition: execnodes.h:1877
#define Max(x, y)
Definition: c.h:806
Oid * partOperators
Definition: plannodes.h:806
TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: execTuples.c:795
#define makeNode(_type_)
Definition: nodes.h:560
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
void ExecEndWindowAgg(WindowAggState *node)
TupleTableSlot * ecxt_outertuple
Definition: execnodes.h:200
#define WINDOW_SEEK_CURRENT
Definition: windowapi.h:32
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
int64 frametailpos
Definition: execnodes.h:1883
#define FRAMEOPTION_RANGE
Definition: parsenodes.h:505
#define Assert(condition)
Definition: c.h:680
#define lfirst(lc)
Definition: pg_list.h:106
Index winref
Definition: plannodes.h:803
#define EXEC_FLAG_MARK
Definition: executor.h:61
#define DatumIsReadWriteExpandedObject(d, isnull, typlen)
static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate)
FormData_pg_aggregate * Form_pg_aggregate
Definition: pg_aggregate.h:89
bool MemoryContextContains(MemoryContext context, void *pointer)
Definition: mcxt.c:566
void build_aggregate_transfn_expr(Oid *agg_input_types, int agg_num_inputs, int agg_num_direct_inputs, bool agg_variadic, Oid agg_state_type, Oid agg_input_collation, Oid transfn_oid, Oid invtransfn_oid, Expr **transfnexpr, Expr **invtransfnexpr)
Definition: parse_agg.c:1893
size_t Size
Definition: c.h:414
void ExecAssignExprContext(EState *estate, PlanState *planstate)
Definition: execUtils.c:426
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:120
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
static int list_length(const List *l)
Definition: pg_list.h:89
MemoryContext aggcontext
void tuplestore_end(Tuplestorestate *state)
Definition: tuplestore.c:453
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
Definition: lsyscache.c:2020
#define FRAMEOPTION_ROWS
Definition: parsenodes.h:506
bool tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
Definition: tuplestore.c:1135
WindowAggState * ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:203
Datum WinGetFuncArgInFrame(WindowObject winobj, int argno, int relpos, int seektype, bool set_mark, bool *isnull, bool *isout)
#define FRAMEOPTION_END_VALUE_PRECEDING
Definition: parsenodes.h:515
#define INT64_FORMAT
Definition: c.h:348
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
int64 currentpos
Definition: execnodes.h:1881
ExprState * qual
Definition: execnodes.h:868
#define DatumGetPointer(X)
Definition: postgres.h:555
int tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
Definition: tuplestore.c:383
static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, TupleTableSlot *slot2)
bool partition_spooled
Definition: execnodes.h:1902
void ExecAssignScanTypeFromOuterPlan(ScanState *scanstate)
Definition: execUtils.c:639
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool framehead_valid
Definition: execnodes.h:1906
#define ACL_EXECUTE
Definition: parsenodes.h:79
AclResult pg_proc_aclcheck(Oid proc_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4504
int i
Plan plan
Definition: plannodes.h:802
void tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
Definition: tuplestore.c:473
struct WindowStatePerAggData WindowStatePerAggData
void WinSetMarkPosition(WindowObject winobj, int64 markpos)
void ExecReScanWindowAgg(WindowAggState *node)
void * arg
MemoryContext partcontext
Definition: execnodes.h:1895
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:113
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
int frameOptions
Definition: plannodes.h:810
WindowFunc * wfunc
Definition: execnodes.h:671
FmgrInfo * execTuplesMatchPrepare(int numCols, Oid *eqOperators)
Definition: execGrouping.c:204
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
Definition: pg_list.h:45
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1733
int16 AttrNumber
Definition: attnum.h:21
#define FRAMEOPTION_END_VALUE
Definition: parsenodes.h:521
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
Definition: executor.h:320
static void eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, Datum *result, bool *isnull)
#define ResetExprContext(econtext)
Definition: executor.h:462
Oid resolve_aggregate_transtype(Oid aggfuncid, Oid aggtranstype, Oid *inputTypes, int numArguments)
Definition: parse_agg.c:1838