PostgreSQL Source Code  git master
nodeIncrementalSort.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "executor/execdebug.h"
#include "executor/nodeIncrementalSort.h"
#include "miscadmin.h"
#include "utils/lsyscache.h"
#include "utils/tuplesort.h"
Include dependency graph for nodeIncrementalSort.c:

Go to the source code of this file.

Macros

#define INSTRUMENT_SORT_GROUP(node, groupName)
 
#define DEFAULT_MIN_GROUP_SIZE   32
 
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE   (2 * DEFAULT_MIN_GROUP_SIZE)
 

Functions

static void instrumentSortedGroup (IncrementalSortGroupInfo *groupInfo, Tuplesortstate *sortState)
 
static void preparePresortedCols (IncrementalSortState *node)
 
static bool isCurrentGroup (IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
 
static void switchToPresortedPrefixMode (PlanState *pstate)
 
static TupleTableSlotExecIncrementalSort (PlanState *pstate)
 
IncrementalSortStateExecInitIncrementalSort (IncrementalSort *node, EState *estate, int eflags)
 
void ExecEndIncrementalSort (IncrementalSortState *node)
 
void ExecReScanIncrementalSort (IncrementalSortState *node)
 
void ExecIncrementalSortEstimate (IncrementalSortState *node, ParallelContext *pcxt)
 
void ExecIncrementalSortInitializeDSM (IncrementalSortState *node, ParallelContext *pcxt)
 
void ExecIncrementalSortInitializeWorker (IncrementalSortState *node, ParallelWorkerContext *pwcxt)
 
void ExecIncrementalSortRetrieveInstrumentation (IncrementalSortState *node)
 

Macro Definition Documentation

◆ DEFAULT_MAX_FULL_SORT_GROUP_SIZE

#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE   (2 * DEFAULT_MIN_GROUP_SIZE)

Definition at line 480 of file nodeIncrementalSort.c.

Referenced by ExecIncrementalSort().

◆ DEFAULT_MIN_GROUP_SIZE

#define DEFAULT_MIN_GROUP_SIZE   32

Definition at line 468 of file nodeIncrementalSort.c.

Referenced by ExecIncrementalSort().

◆ INSTRUMENT_SORT_GROUP

#define INSTRUMENT_SORT_GROUP (   node,
  groupName 
)
Value:
do { \
if ((node)->ss.ps.instrument != NULL) \
{ \
if ((node)->shared_info && (node)->am_worker) \
{ \
Assert(IsParallelWorker()); \
Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
(node)->groupName##_state); \
} \
else \
{ \
instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
(node)->groupName##_state); \
} \
} \
} while (0)
int ParallelWorkerNumber
Definition: parallel.c:112
#define IsParallelWorker()
Definition: parallel.h:61

Definition at line 99 of file nodeIncrementalSort.c.

Referenced by ExecIncrementalSort(), and switchToPresortedPrefixMode().

Function Documentation

◆ ExecEndIncrementalSort()

void ExecEndIncrementalSort ( IncrementalSortState node)

Definition at line 1076 of file nodeIncrementalSort.c.

References ExecClearTuple(), ExecDropSingleTupleTableSlot(), ExecEndNode(), IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, outerPlanState, IncrementalSortState::prefixsort_state, ScanState::ps, PlanState::ps_ResultTupleSlot, SO_printf, IncrementalSortState::ss, ScanState::ss_ScanTupleSlot, IncrementalSortState::transfer_tuple, and tuplesort_end().

Referenced by ExecEndNode().

1077 {
1078  SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1079 
1080  /* clean out the scan tuple */
1082  /* must drop pointer to sort result tuple */
1084  /* must drop standalone tuple slots from outer node */
1087 
1088  /*
1089  * Release tuplesort resources.
1090  */
1091  if (node->fullsort_state != NULL)
1092  {
1094  node->fullsort_state = NULL;
1095  }
1096  if (node->prefixsort_state != NULL)
1097  {
1099  node->prefixsort_state = NULL;
1100  }
1101 
1102  /*
1103  * Shut down the subplan.
1104  */
1105  ExecEndNode(outerPlanState(node));
1106 
1107  SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1108 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:555
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1376
PlanState ps
Definition: execnodes.h:1373
#define SO_printf(s)
Definition: execdebug.h:92
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1002
#define outerPlanState(node)
Definition: execnodes.h:1058
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
Tuplesortstate * fullsort_state
Definition: execnodes.h:2204
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2205
TupleTableSlot * group_pivot
Definition: execnodes.h:2212
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:1464
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2213

◆ ExecIncrementalSort()

static TupleTableSlot* ExecIncrementalSort ( PlanState pstate)
static

Definition at line 496 of file nodeIncrementalSort.c.

References Assert, IncrementalSortState::bound, IncrementalSortState::bound_Done, IncrementalSortState::bounded, castNode, CHECK_FOR_INTERRUPTS, Sort::collations, DEFAULT_MAX_FULL_SORT_GROUP_SIZE, DEFAULT_MIN_GROUP_SIZE, EState::es_direction, ExecClearTuple(), ExecCopySlot(), ExecGetResultType(), ExecProcNode(), IncrementalSortState::execution_status, ForwardScanDirection, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADFULLSORT, INCSORT_LOADPREFIXSORT, INCSORT_READFULLSORT, INCSORT_READPREFIXSORT, INSTRUMENT_SORT_GROUP, INT64_FORMAT, isCurrentGroup(), Min, IncrementalSortState::n_fullsort_remaining, Sort::nullsFirst, Sort::numCols, IncrementalSortState::outerNodeDone, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, preparePresortedCols(), ScanState::ps, PlanState::ps_ResultTupleSlot, ScanDirectionIsForward, SO1_printf, SO2_printf, SO_printf, IncrementalSort::sort, Sort::sortColIdx, Sort::sortOperators, IncrementalSortState::ss, PlanState::state, switchToPresortedPrefixMode(), TupIsNull, tuplesort_begin_heap(), tuplesort_gettupleslot(), tuplesort_performsort(), tuplesort_puttupleslot(), tuplesort_reset(), tuplesort_set_bound(), tuplesort_used_bound(), and work_mem.

Referenced by ExecInitIncrementalSort().

497 {
499  EState *estate;
500  ScanDirection dir;
501  Tuplesortstate *read_sortstate;
502  Tuplesortstate *fullsort_state;
503  TupleTableSlot *slot;
504  IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
505  PlanState *outerNode;
506  TupleDesc tupDesc;
507  int64 nTuples = 0;
508  int64 minGroupSize;
509 
511 
512  estate = node->ss.ps.state;
513  dir = estate->es_direction;
514  fullsort_state = node->fullsort_state;
515 
516  /*
517  * If a previous iteration has sorted a batch, then we need to check to
518  * see if there are any remaining tuples in that batch that we can return
519  * before moving on to other execution states.
520  */
523  {
524  /*
525  * Return next tuple from the current sorted group set if available.
526  */
527  read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
528  fullsort_state : node->prefixsort_state;
529  slot = node->ss.ps.ps_ResultTupleSlot;
530 
531  /*
532  * We have to populate the slot from the tuplesort before checking
533  * outerNodeDone because it will set the slot to NULL if no more
534  * tuples remain. If the tuplesort is empty, but we don't have any
535  * more tuples available for sort from the outer node, then
536  * outerNodeDone will have been set so we'll return that now-empty
537  * slot to the caller.
538  */
539  if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
540  false, slot, NULL) || node->outerNodeDone)
541 
542  /*
543  * Note: there isn't a good test case for the node->outerNodeDone
544  * check directly, but we need it for any plan where the outer
545  * node will fail when trying to fetch too many tuples.
546  */
547  return slot;
548  else if (node->n_fullsort_remaining > 0)
549  {
550  /*
551  * When we transition to presorted prefix mode, we might have
552  * accumulated at least one additional prefix key group in the
553  * full sort tuplesort. The first call to
554  * switchToPresortedPrefixMode() will have pulled the first one of
555  * those groups out, and we've returned those tuples to the parent
556  * node, but if at this point we still have tuples remaining in
557  * the full sort state (i.e., n_fullsort_remaining > 0), then we
558  * need to re-execute the prefix mode transition function to pull
559  * out the next prefix key group.
560  */
561  SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
562  node->n_fullsort_remaining);
564  }
565  else
566  {
567  /*
568  * If we don't have any sorted tuples to read and we're not
569  * currently transitioning into presorted prefix sort mode, then
570  * it's time to start the process all over again by building a new
571  * group in the full sort state.
572  */
573  SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
575  }
576  }
577 
578  /*
579  * Scan the subplan in the forward direction while creating the sorted
580  * data.
581  */
583 
584  outerNode = outerPlanState(node);
585  tupDesc = ExecGetResultType(outerNode);
586 
587  /* Load tuples into the full sort state. */
589  {
590  /*
591  * Initialize sorting structures.
592  */
593  if (fullsort_state == NULL)
594  {
595  /*
596  * Initialize presorted column support structures for
597  * isCurrentGroup(). It's correct to do this along with the
598  * initial initialization for the full sort state (and not for the
599  * prefix sort state) since we always load the full sort state
600  * first.
601  */
602  preparePresortedCols(node);
603 
604  /*
605  * Since we optimize small prefix key groups by accumulating a
606  * minimum number of tuples before sorting, we can't assume that a
607  * group of tuples all have the same prefix key values. Hence we
608  * setup the full sort tuplesort to sort by all requested sort
609  * keys.
610  */
611  fullsort_state = tuplesort_begin_heap(tupDesc,
612  plannode->sort.numCols,
613  plannode->sort.sortColIdx,
614  plannode->sort.sortOperators,
615  plannode->sort.collations,
616  plannode->sort.nullsFirst,
617  work_mem,
618  NULL,
619  false);
620  node->fullsort_state = fullsort_state;
621  }
622  else
623  {
624  /* Reset sort for the next batch. */
625  tuplesort_reset(fullsort_state);
626  }
627 
628  /*
629  * Calculate the remaining tuples left if bounded and configure both
630  * bounded sort and the minimum group size accordingly.
631  */
632  if (node->bounded)
633  {
634  int64 currentBound = node->bound - node->bound_Done;
635 
636  /*
637  * Bounded sort isn't likely to be a useful optimization for full
638  * sort mode since we limit full sort mode to a relatively small
639  * number of tuples and tuplesort doesn't switch over to top-n
640  * heap sort anyway unless it hits (2 * bound) tuples.
641  */
642  if (currentBound < DEFAULT_MIN_GROUP_SIZE)
643  tuplesort_set_bound(fullsort_state, currentBound);
644 
645  minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
646  }
647  else
648  minGroupSize = DEFAULT_MIN_GROUP_SIZE;
649 
650  /*
651  * Because we have to read the next tuple to find out that we've
652  * encountered a new prefix key group, on subsequent groups we have to
653  * carry over that extra tuple and add it to the new group's sort here
654  * before we read any new tuples from the outer node.
655  */
656  if (!TupIsNull(node->group_pivot))
657  {
658  tuplesort_puttupleslot(fullsort_state, node->group_pivot);
659  nTuples++;
660 
661  /*
662  * We're in full sort mode accumulating a minimum number of tuples
663  * and not checking for prefix key equality yet, so we can't
664  * assume the group pivot tuple will remain the same -- unless
665  * we're using a minimum group size of 1, in which case the pivot
666  * is obviously still the pivot.
667  */
668  if (nTuples != minGroupSize)
670  }
671 
672 
673  /*
674  * Pull as many tuples from the outer node as possible given our
675  * current operating mode.
676  */
677  for (;;)
678  {
679  slot = ExecProcNode(outerNode);
680 
681  /*
682  * If the outer node can't provide us any more tuples, then we can
683  * sort the current group and return those tuples.
684  */
685  if (TupIsNull(slot))
686  {
687  /*
688  * We need to know later if the outer node has completed to be
689  * able to distinguish between being done with a batch and
690  * being done with the whole node.
691  */
692  node->outerNodeDone = true;
693 
694  SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples);
695  tuplesort_performsort(fullsort_state);
696 
697  INSTRUMENT_SORT_GROUP(node, fullsort);
698 
699  SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
701  break;
702  }
703 
704  /* Accumulate the next group of presorted tuples. */
705  if (nTuples < minGroupSize)
706  {
707  /*
708  * If we haven't yet hit our target minimum group size, then
709  * we don't need to bother checking for inclusion in the
710  * current prefix group since at this point we'll assume that
711  * we'll full sort this batch to avoid a large number of very
712  * tiny (and thus inefficient) sorts.
713  */
714  tuplesort_puttupleslot(fullsort_state, slot);
715  nTuples++;
716 
717  /*
718  * If we've reached our minimum group size, then we need to
719  * store the most recent tuple as a pivot.
720  */
721  if (nTuples == minGroupSize)
722  ExecCopySlot(node->group_pivot, slot);
723  }
724  else
725  {
726  /*
727  * If we've already accumulated enough tuples to reach our
728  * minimum group size, then we need to compare any additional
729  * tuples to our pivot tuple to see if we reach the end of
730  * that prefix key group. Only after we find changed prefix
731  * keys can we guarantee sort stability of the tuples we've
732  * already accumulated.
733  */
734  if (isCurrentGroup(node, node->group_pivot, slot))
735  {
736  /*
737  * As long as the prefix keys match the pivot tuple then
738  * load the tuple into the tuplesort.
739  */
740  tuplesort_puttupleslot(fullsort_state, slot);
741  nTuples++;
742  }
743  else
744  {
745  /*
746  * Since the tuple we fetched isn't part of the current
747  * prefix key group we don't want to sort it as part of
748  * the current batch. Instead we use the group_pivot slot
749  * to carry it over to the next batch (even though we
750  * won't actually treat it as a group pivot).
751  */
752  ExecCopySlot(node->group_pivot, slot);
753 
754  if (node->bounded)
755  {
756  /*
757  * If the current node has a bound, and we've already
758  * sorted n tuples, then the functional bound
759  * remaining is (original bound - n), so store the
760  * current number of processed tuples for later use
761  * configuring the sort state's bound.
762  */
763  SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
764  node->bound_Done,
765  Min(node->bound, node->bound_Done + nTuples));
766  node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
767  }
768 
769  /*
770  * Once we find changed prefix keys we can complete the
771  * sort and transition modes to reading out the sorted
772  * tuples.
773  */
774  SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n",
775  nTuples);
776  tuplesort_performsort(fullsort_state);
777 
778  INSTRUMENT_SORT_GROUP(node, fullsort);
779 
780  SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
782  break;
783  }
784  }
785 
786  /*
787  * Unless we've already transitioned modes to reading from the
788  * full sort state, then we assume that having read at least
789  * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
790  * processing a large group of tuples all having equal prefix keys
791  * (but haven't yet found the final tuple in that prefix key
792  * group), so we need to transition into presorted prefix mode.
793  */
794  if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
796  {
797  /*
798  * The group pivot we have stored has already been put into
799  * the tuplesort; we don't want to carry it over. Since we
800  * haven't yet found the end of the prefix key group, it might
801  * seem like we should keep this, but we don't actually know
802  * how many prefix key groups might be represented in the full
803  * sort state, so we'll let the mode transition function
804  * manage this state for us.
805  */
807 
808  /*
809  * Unfortunately the tuplesort API doesn't include a way to
810  * retrieve tuples unless a sort has been performed, so we
811  * perform the sort even though we could just as easily rely
812  * on FIFO retrieval semantics when transferring them to the
813  * presorted prefix tuplesort.
814  */
815  SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples);
816  tuplesort_performsort(fullsort_state);
817 
818  INSTRUMENT_SORT_GROUP(node, fullsort);
819 
820  /*
821  * If the full sort tuplesort happened to switch into top-n
822  * heapsort mode then we will only be able to retrieve
823  * currentBound tuples (since the tuplesort will have only
824  * retained the top-n tuples). This is safe even though we
825  * haven't yet completed fetching the current prefix key group
826  * because the tuples we've "lost" already sorted "below" the
827  * retained ones, and we're already contractually guaranteed
828  * to not need any more than the currentBound tuples.
829  */
831  {
832  int64 currentBound = node->bound - node->bound_Done;
833 
834  SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n",
835  nTuples, Min(currentBound, nTuples));
836  nTuples = Min(currentBound, nTuples);
837  }
838 
839  SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
840  nTuples);
841 
842  /*
843  * We might have multiple prefix key groups in the full sort
844  * state, so the mode transition function needs to know that
845  * it needs to move from the fullsort to presorted prefix
846  * sort.
847  */
848  node->n_fullsort_remaining = nTuples;
849 
850  /* Transition the tuples to the presorted prefix tuplesort. */
852 
853  /*
854  * Since we know we had tuples to move to the presorted prefix
855  * tuplesort, we know that unless that transition has verified
856  * that all tuples belonged to the same prefix key group (in
857  * which case we can go straight to continuing to load tuples
858  * into that tuplesort), we should have a tuple to return
859  * here.
860  *
861  * Either way, the appropriate execution status should have
862  * been set by switchToPresortedPrefixMode(), so we can drop
863  * out of the loop here and let the appropriate path kick in.
864  */
865  break;
866  }
867  }
868  }
869 
871  {
872  /*
873  * We only enter this state after the mode transition function has
874  * confirmed all remaining tuples from the full sort state have the
875  * same prefix and moved those tuples to the prefix sort state. That
876  * function has also set a group pivot tuple (which doesn't need to be
877  * carried over; it's already been put into the prefix sort state).
878  */
879  Assert(!TupIsNull(node->group_pivot));
880 
881  /*
882  * Read tuples from the outer node and load them into the prefix sort
883  * state until we encounter a tuple whose prefix keys don't match the
884  * current group_pivot tuple, since we can't guarantee sort stability
885  * until we have all tuples matching those prefix keys.
886  */
887  for (;;)
888  {
889  slot = ExecProcNode(outerNode);
890 
891  /*
892  * If we've exhausted tuples from the outer node we're done
893  * loading the prefix sort state.
894  */
895  if (TupIsNull(slot))
896  {
897  /*
898  * We need to know later if the outer node has completed to be
899  * able to distinguish between being done with a batch and
900  * being done with the whole node.
901  */
902  node->outerNodeDone = true;
903  break;
904  }
905 
906  /*
907  * If the tuple's prefix keys match our pivot tuple, we're not
908  * done yet and can load it into the prefix sort state. If not, we
909  * don't want to sort it as part of the current batch. Instead we
910  * use the group_pivot slot to carry it over to the next batch
911  * (even though we won't actually treat it as a group pivot).
912  */
913  if (isCurrentGroup(node, node->group_pivot, slot))
914  {
916  nTuples++;
917  }
918  else
919  {
920  ExecCopySlot(node->group_pivot, slot);
921  break;
922  }
923  }
924 
925  /*
926  * Perform the sort and begin returning the tuples to the parent plan
927  * node.
928  */
929  SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
931 
932  INSTRUMENT_SORT_GROUP(node, prefixsort);
933 
934  SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
936 
937  if (node->bounded)
938  {
939  /*
940  * If the current node has a bound, and we've already sorted n
941  * tuples, then the functional bound remaining is (original bound
942  * - n), so store the current number of processed tuples for use
943  * in configuring sorting bound.
944  */
945  SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
946  node->bound_Done,
947  Min(node->bound, node->bound_Done + nTuples));
948  node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
949  }
950  }
951 
952  /* Restore to user specified direction. */
953  estate->es_direction = dir;
954 
955  /*
956  * Get the first or next tuple from tuplesort. Returns NULL if no more
957  * tuples.
958  */
959  read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
960  fullsort_state : node->prefixsort_state;
961  slot = node->ss.ps.ps_ResultTupleSlot;
962  (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
963  false, slot, NULL);
964  return slot;
965 }
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:1380
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2040
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define ScanDirectionIsForward(direction)
Definition: sdir.h:55
#define castNode(_type_, nodeptr)
Definition: nodes.h:608
#define Min(x, y)
Definition: c.h:986
bool * nullsFirst
Definition: plannodes.h:814
EState * state
Definition: execnodes.h:966
#define SO1_printf(s, p)
Definition: execdebug.h:93
static void preparePresortedCols(IncrementalSortState *node)
Oid * sortOperators
Definition: plannodes.h:812
ScanDirection es_direction
Definition: execnodes.h:555
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:897
PlanState ps
Definition: execnodes.h:1373
#define SO_printf(s)
Definition: execdebug.h:92
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1002
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1532
#define outerPlanState(node)
Definition: execnodes.h:1058
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2408
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:1334
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
ScanDirection
Definition: sdir.h:22
#define TupIsNull(slot)
Definition: tuptable.h:292
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2202
int numCols
Definition: plannodes.h:810
#define INSTRUMENT_SORT_GROUP(node, groupName)
#define SO2_printf(s, p1, p2)
Definition: execdebug.h:94
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:252
int work_mem
Definition: globals.c:124
Tuplesortstate * fullsort_state
Definition: execnodes.h:2204
Plan * plan
Definition: execnodes.h:964
#define DEFAULT_MIN_GROUP_SIZE
#define Assert(condition)
Definition: c.h:804
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:490
#define INT64_FORMAT
Definition: c.h:483
static void switchToPresortedPrefixMode(PlanState *pstate)
AttrNumber * sortColIdx
Definition: plannodes.h:811
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2205
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
TupleTableSlot * group_pivot
Definition: execnodes.h:2212
Oid * collations
Definition: plannodes.h:813
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
Definition: tuplesort.c:1684

◆ ExecIncrementalSortEstimate()

void ExecIncrementalSortEstimate ( IncrementalSortState node,
ParallelContext pcxt 
)

Definition at line 1184 of file nodeIncrementalSort.c.

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, offsetof, ScanState::ps, shm_toc_estimate_chunk, shm_toc_estimate_keys, and IncrementalSortState::ss.

Referenced by ExecParallelEstimate().

1185 {
1186  Size size;
1187 
1188  /* don't need this if not instrumenting or no workers */
1189  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1190  return;
1191 
1192  size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1193  size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1194  shm_toc_estimate_chunk(&pcxt->estimator, size);
1195  shm_toc_estimate_keys(&pcxt->estimator, 1);
1196 }
Instrumentation * instrument
Definition: execnodes.h:974
shm_toc_estimator estimator
Definition: parallel.h:42
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
PlanState ps
Definition: execnodes.h:1373
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define offsetof(type, field)
Definition: c.h:727

◆ ExecIncrementalSortInitializeDSM()

void ExecIncrementalSortInitializeDSM ( IncrementalSortState node,
ParallelContext pcxt 
)

Definition at line 1205 of file nodeIncrementalSort.c.

References PlanState::instrument, SharedIncrementalSortInfo::num_workers, ParallelContext::nworkers, offsetof, PlanState::plan, Plan::plan_node_id, ScanState::ps, IncrementalSortState::shared_info, shm_toc_allocate(), shm_toc_insert(), IncrementalSortState::ss, and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

1206 {
1207  Size size;
1208 
1209  /* don't need this if not instrumenting or no workers */
1210  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1211  return;
1212 
1213  size = offsetof(SharedIncrementalSortInfo, sinfo)
1214  + pcxt->nworkers * sizeof(IncrementalSortInfo);
1215  node->shared_info = shm_toc_allocate(pcxt->toc, size);
1216  /* ensure any unfilled slots will contain zeroes */
1217  memset(node->shared_info, 0, size);
1218  node->shared_info->num_workers = pcxt->nworkers;
1219  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1220  node->shared_info);
1221 }
Instrumentation * instrument
Definition: execnodes.h:974
int plan_node_id
Definition: plannodes.h:140
PlanState ps
Definition: execnodes.h:1373
Plan * plan
Definition: execnodes.h:964
struct IncrementalSortInfo IncrementalSortInfo
size_t Size
Definition: c.h:540
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2215
#define offsetof(type, field)
Definition: c.h:727
shm_toc * toc
Definition: parallel.h:45

◆ ExecIncrementalSortInitializeWorker()

void ExecIncrementalSortInitializeWorker ( IncrementalSortState node,
ParallelWorkerContext pwcxt 
)

Definition at line 1230 of file nodeIncrementalSort.c.

References IncrementalSortState::am_worker, PlanState::plan, Plan::plan_node_id, ScanState::ps, IncrementalSortState::shared_info, shm_toc_lookup(), IncrementalSortState::ss, and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

1231 {
1232  node->shared_info =
1233  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1234  node->am_worker = true;
1235 }
int plan_node_id
Definition: plannodes.h:140
PlanState ps
Definition: execnodes.h:1373
Plan * plan
Definition: execnodes.h:964
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2215
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

◆ ExecIncrementalSortRetrieveInstrumentation()

void ExecIncrementalSortRetrieveInstrumentation ( IncrementalSortState node)

Definition at line 1244 of file nodeIncrementalSort.c.

References SharedIncrementalSortInfo::num_workers, offsetof, palloc(), and IncrementalSortState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

1245 {
1246  Size size;
1248 
1249  if (node->shared_info == NULL)
1250  return;
1251 
1252  size = offsetof(SharedIncrementalSortInfo, sinfo)
1253  + node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1254  si = palloc(size);
1255  memcpy(si, node->shared_info, size);
1256  node->shared_info = si;
1257 }
struct IncrementalSortInfo IncrementalSortInfo
size_t Size
Definition: c.h:540
void * palloc(Size size)
Definition: mcxt.c:1062
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2215
#define offsetof(type, field)
Definition: c.h:727

◆ ExecInitIncrementalSort()

IncrementalSortState* ExecInitIncrementalSort ( IncrementalSort node,
EState estate,
int  eflags 
)

Definition at line 975 of file nodeIncrementalSort.c.

References Assert, IncrementalSortState::bound_Done, IncrementalSortState::bounded, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecCreateScanSlotFromOuterPlan(), ExecGetResultType(), ExecIncrementalSort(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortInfo::fullsortGroupInfo, IncrementalSortState::group_pivot, IncrementalSortGroupInfo::groupCount, IncrementalSortState::incsort_info, INCSORT_LOADFULLSORT, PlanState::instrument, makeNode, MakeSingleTupleTableSlot(), IncrementalSortGroupInfo::maxDiskSpaceUsed, IncrementalSortGroupInfo::maxMemorySpaceUsed, IncrementalSortState::n_fullsort_remaining, IncrementalSortState::outerNodeDone, outerPlan, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, IncrementalSortInfo::prefixsortGroupInfo, IncrementalSortState::presorted_keys, ScanState::ps, PlanState::ps_ProjInfo, SO_printf, IncrementalSortGroupInfo::sortMethods, IncrementalSortState::ss, PlanState::state, IncrementalSortGroupInfo::totalDiskSpaceUsed, IncrementalSortGroupInfo::totalMemorySpaceUsed, IncrementalSortState::transfer_tuple, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

976 {
977  IncrementalSortState *incrsortstate;
978 
979  SO_printf("ExecInitIncrementalSort: initializing sort node\n");
980 
981  /*
982  * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
983  * EXEC_FLAG_MARK, because the current sort state contains only one sort
984  * batch rather than the full result set.
985  */
986  Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
987 
988  /* Initialize state structure. */
989  incrsortstate = makeNode(IncrementalSortState);
990  incrsortstate->ss.ps.plan = (Plan *) node;
991  incrsortstate->ss.ps.state = estate;
992  incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
993 
994  incrsortstate->execution_status = INCSORT_LOADFULLSORT;
995  incrsortstate->bounded = false;
996  incrsortstate->outerNodeDone = false;
997  incrsortstate->bound_Done = 0;
998  incrsortstate->fullsort_state = NULL;
999  incrsortstate->prefixsort_state = NULL;
1000  incrsortstate->group_pivot = NULL;
1001  incrsortstate->transfer_tuple = NULL;
1002  incrsortstate->n_fullsort_remaining = 0;
1003  incrsortstate->presorted_keys = NULL;
1004 
1005  if (incrsortstate->ss.ps.instrument != NULL)
1006  {
1007  IncrementalSortGroupInfo *fullsortGroupInfo =
1008  &incrsortstate->incsort_info.fullsortGroupInfo;
1009  IncrementalSortGroupInfo *prefixsortGroupInfo =
1010  &incrsortstate->incsort_info.prefixsortGroupInfo;
1011 
1012  fullsortGroupInfo->groupCount = 0;
1013  fullsortGroupInfo->maxDiskSpaceUsed = 0;
1014  fullsortGroupInfo->totalDiskSpaceUsed = 0;
1015  fullsortGroupInfo->maxMemorySpaceUsed = 0;
1016  fullsortGroupInfo->totalMemorySpaceUsed = 0;
1017  fullsortGroupInfo->sortMethods = 0;
1018  prefixsortGroupInfo->groupCount = 0;
1019  prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1020  prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1021  prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1022  prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1023  prefixsortGroupInfo->sortMethods = 0;
1024  }
1025 
1026  /*
1027  * Miscellaneous initialization
1028  *
1029  * Sort nodes don't initialize their ExprContexts because they never call
1030  * ExecQual or ExecProject.
1031  */
1032 
1033  /*
1034  * Initialize child nodes.
1035  *
1036  * Incremental sort does not support backwards scans and mark/restore, so
1037  * we don't bother removing the flags from eflags here. We allow passing a
1038  * REWIND flag, because although incremental sort can't use it, the child
1039  * nodes may be able to do something more useful.
1040  */
1041  outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1042 
1043  /*
1044  * Initialize scan slot and type.
1045  */
1046  ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1047 
1048  /*
1049  * Initialize return slot and type. No need to initialize projection info
1050  * because we don't do any projections.
1051  */
1053  incrsortstate->ss.ps.ps_ProjInfo = NULL;
1054 
1055  /*
1056  * Initialize standalone slots to store a tuple for pivot prefix keys and
1057  * for carrying over a tuple from one batch to the next.
1058  */
1059  incrsortstate->group_pivot =
1062  incrsortstate->transfer_tuple =
1065 
1066  SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1067 
1068  return incrsortstate;
1069 }
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1004
Instrumentation * instrument
Definition: execnodes.h:974
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
IncrementalSortGroupInfo prefixsortGroupInfo
Definition: execnodes.h:2170
PresortedKeyData * presorted_keys
Definition: execnodes.h:2207
EState * state
Definition: execnodes.h:966
PlanState ps
Definition: execnodes.h:1373
#define SO_printf(s)
Definition: execdebug.h:92
IncrementalSortInfo incsort_info
Definition: execnodes.h:2209
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1058
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2202
static TupleTableSlot * ExecIncrementalSort(PlanState *pstate)
#define outerPlan(node)
Definition: plannodes.h:171
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:970
IncrementalSortGroupInfo fullsortGroupInfo
Definition: execnodes.h:2169
Tuplesortstate * fullsort_state
Definition: execnodes.h:2204
Plan * plan
Definition: execnodes.h:964
#define makeNode(_type_)
Definition: nodes.h:587
#define Assert(condition)
Definition: c.h:804
#define EXEC_FLAG_MARK
Definition: executor.h:59
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1799
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:490
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2205
TupleTableSlot * group_pivot
Definition: execnodes.h:2212
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
Definition: execUtils.c:682
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:141
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2213

◆ ExecReScanIncrementalSort()

void ExecReScanIncrementalSort ( IncrementalSortState node)

Definition at line 1111 of file nodeIncrementalSort.c.

References IncrementalSortState::bound_Done, PlanState::chgParam, ExecClearTuple(), ExecReScan(), IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADFULLSORT, IncrementalSortState::n_fullsort_remaining, IncrementalSortState::outerNodeDone, outerPlan, outerPlanState, IncrementalSortState::prefixsort_state, IncrementalSortState::presorted_keys, ScanState::ps, PlanState::ps_ResultTupleSlot, IncrementalSortState::ss, IncrementalSortState::transfer_tuple, and tuplesort_reset().

Referenced by ExecReScan().

1112 {
1114 
1115  /*
1116  * Incremental sort doesn't support efficient rescan even when parameters
1117  * haven't changed (e.g., rewind) because unlike regular sort we don't
1118  * store all tuples at once for the full sort.
1119  *
1120  * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1121  * re-execute the sort along with the child node. Incremental sort itself
1122  * can't do anything smarter, but maybe the child nodes can.
1123  *
1124  * In theory if we've only filled the full sort with one batch (and
1125  * haven't reset it for a new batch yet) then we could efficiently rewind,
1126  * but that seems a narrow enough case that it's not worth handling
1127  * specially at this time.
1128  */
1129 
1130  /* must drop pointer to sort result tuple */
1132 
1133  if (node->group_pivot != NULL)
1134  ExecClearTuple(node->group_pivot);
1135  if (node->transfer_tuple != NULL)
1137 
1138  node->outerNodeDone = false;
1139  node->n_fullsort_remaining = 0;
1140  node->bound_Done = 0;
1141  node->presorted_keys = NULL;
1142 
1144 
1145  /*
1146  * If we've set up either of the sort states yet, we need to reset them.
1147  * We could end them and null out the pointers, but there's no reason to
1148  * repay the setup cost, and because ExecIncrementalSort guards presorted
1149  * column functions by checking to see if the full sort state has been
1150  * initialized yet, setting the sort states to null here might actually
1151  * cause a leak.
1152  */
1153  if (node->fullsort_state != NULL)
1154  {
1156  node->fullsort_state = NULL;
1157  }
1158  if (node->prefixsort_state != NULL)
1159  {
1161  node->prefixsort_state = NULL;
1162  }
1163 
1164  /*
1165  * If chgParam of subnode is not null, then the plan will be re-scanned
1166  * by the first ExecProcNode.
1167  */
1168  if (outerPlan->chgParam == NULL)
1169  ExecReScan(outerPlan);
1170 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
void ExecReScan(PlanState *node)
Definition: execAmi.c:78
PresortedKeyData * presorted_keys
Definition: execnodes.h:2207
PlanState ps
Definition: execnodes.h:1373
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1002
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1532
#define outerPlanState(node)
Definition: execnodes.h:1058
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2202
Bitmapset * chgParam
Definition: execnodes.h:996
#define outerPlan(node)
Definition: plannodes.h:171
Tuplesortstate * fullsort_state
Definition: execnodes.h:2204
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2205
TupleTableSlot * group_pivot
Definition: execnodes.h:2212
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2213

◆ instrumentSortedGroup()

static void instrumentSortedGroup ( IncrementalSortGroupInfo groupInfo,
Tuplesortstate sortState 
)
static

Definition at line 128 of file nodeIncrementalSort.c.

References IncrementalSortGroupInfo::groupCount, IncrementalSortGroupInfo::maxDiskSpaceUsed, IncrementalSortGroupInfo::maxMemorySpaceUsed, SORT_SPACE_TYPE_DISK, SORT_SPACE_TYPE_MEMORY, TuplesortInstrumentation::sortMethod, IncrementalSortGroupInfo::sortMethods, TuplesortInstrumentation::spaceType, TuplesortInstrumentation::spaceUsed, IncrementalSortGroupInfo::totalDiskSpaceUsed, IncrementalSortGroupInfo::totalMemorySpaceUsed, and tuplesort_get_stats().

130 {
131  TuplesortInstrumentation sort_instr;
132 
133  groupInfo->groupCount++;
134 
135  tuplesort_get_stats(sortState, &sort_instr);
136 
137  /* Calculate total and maximum memory and disk space used. */
138  switch (sort_instr.spaceType)
139  {
141  groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
142  if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
143  groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
144 
145  break;
147  groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
148  if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
149  groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
150 
151  break;
152  }
153 
154  /* Track each sort method we've used. */
155  groupInfo->sortMethods |= sort_instr.sortMethod;
156 }
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:3378
TuplesortMethod sortMethod
Definition: tuplesort.h:91
TuplesortSpaceType spaceType
Definition: tuplesort.h:92

◆ isCurrentGroup()

static bool isCurrentGroup ( IncrementalSortState node,
TupleTableSlot pivot,
TupleTableSlot tuple 
)
static

Definition at line 213 of file nodeIncrementalSort.c.

References FunctionCallInfoBaseData::args, PresortedKeyData::attno, castNode, DatumGetBool, elog, ERROR, PresortedKeyData::fcinfo, PresortedKeyData::flinfo, FmgrInfo::fn_oid, FunctionCallInvoke, i, FunctionCallInfoBaseData::isnull, sort-test::key, PlanState::plan, IncrementalSortState::presorted_keys, ScanState::ps, slot_getattr(), IncrementalSortState::ss, and NullableDatum::value.

Referenced by ExecIncrementalSort(), and switchToPresortedPrefixMode().

214 {
215  int nPresortedCols;
216 
217  nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
218 
219  /*
220  * That the input is sorted by keys * (0, ... n) implies that the tail
221  * keys are more likely to change. Therefore we do our comparison starting
222  * from the last pre-sorted column to optimize for early detection of
223  * inequality and minimizing the number of function calls..
224  */
225  for (int i = nPresortedCols - 1; i >= 0; i--)
226  {
227  Datum datumA,
228  datumB,
229  result;
230  bool isnullA,
231  isnullB;
232  AttrNumber attno = node->presorted_keys[i].attno;
234 
235  datumA = slot_getattr(pivot, attno, &isnullA);
236  datumB = slot_getattr(tuple, attno, &isnullB);
237 
238  /* Special case for NULL-vs-NULL, else use standard comparison */
239  if (isnullA || isnullB)
240  {
241  if (isnullA == isnullB)
242  continue;
243  else
244  return false;
245  }
246 
247  key = &node->presorted_keys[i];
248 
249  key->fcinfo->args[0].value = datumA;
250  key->fcinfo->args[1].value = datumB;
251 
252  /* just for paranoia's sake, we reset isnull each time */
253  key->fcinfo->isnull = false;
254 
255  result = FunctionCallInvoke(key->fcinfo);
256 
257  /* Check for null result, since caller is clearly not expecting one */
258  if (key->fcinfo->isnull)
259  elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
260 
261  if (!DatumGetBool(result))
262  return false;
263  }
264  return true;
265 }
OffsetNumber attno
Definition: execnodes.h:2122
#define castNode(_type_, nodeptr)
Definition: nodes.h:608
PresortedKeyData * presorted_keys
Definition: execnodes.h:2207
PlanState ps
Definition: execnodes.h:1373
#define ERROR
Definition: elog.h:46
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
#define DatumGetBool(X)
Definition: postgres.h:437
FmgrInfo flinfo
Definition: execnodes.h:2120
Datum value
Definition: postgres.h:422
uintptr_t Datum
Definition: postgres.h:411
FunctionCallInfo fcinfo
Definition: execnodes.h:2121
Plan * plan
Definition: execnodes.h:964
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
Oid fn_oid
Definition: fmgr.h:59
#define elog(elevel,...)
Definition: elog.h:232
int i
int16 AttrNumber
Definition: attnum.h:21

◆ preparePresortedCols()

static void preparePresortedCols ( IncrementalSortState node)
static

Definition at line 165 of file nodeIncrementalSort.c.

References FunctionCallInfoBaseData::args, PresortedKeyData::attno, castNode, Sort::collations, CurrentMemoryContext, elog, ERROR, PresortedKeyData::fcinfo, PresortedKeyData::flinfo, fmgr_info_cxt(), get_equality_op_for_ordering_op(), get_opcode(), i, InitFunctionCallInfoData, NullableDatum::isnull, sort-test::key, IncrementalSort::nPresortedCols, OidIsValid, palloc(), palloc0(), PlanState::plan, IncrementalSortState::presorted_keys, ScanState::ps, SizeForFunctionCallInfo, IncrementalSort::sort, Sort::sortColIdx, Sort::sortOperators, and IncrementalSortState::ss.

Referenced by ExecIncrementalSort().

166 {
167  IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
168 
169  node->presorted_keys =
170  (PresortedKeyData *) palloc(plannode->nPresortedCols *
171  sizeof(PresortedKeyData));
172 
173  /* Pre-cache comparison functions for each pre-sorted key. */
174  for (int i = 0; i < plannode->nPresortedCols; i++)
175  {
176  Oid equalityOp,
177  equalityFunc;
179 
180  key = &node->presorted_keys[i];
181  key->attno = plannode->sort.sortColIdx[i];
182 
183  equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
184  NULL);
185  if (!OidIsValid(equalityOp))
186  elog(ERROR, "missing equality operator for ordering operator %u",
187  plannode->sort.sortOperators[i]);
188 
189  equalityFunc = get_opcode(equalityOp);
190  if (!OidIsValid(equalityFunc))
191  elog(ERROR, "missing function for operator %u", equalityOp);
192 
193  /* Lookup the comparison function */
194  fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
195 
196  /* We can initialize the callinfo just once and re-use it */
198  InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
199  plannode->sort.collations[i], NULL, NULL);
200  key->fcinfo->args[0].isnull = false;
201  key->fcinfo->args[1].isnull = false;
202  }
203 }
OffsetNumber attno
Definition: execnodes.h:2122
#define castNode(_type_, nodeptr)
Definition: nodes.h:608
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
Oid get_equality_op_for_ordering_op(Oid opno, bool *reverse)
Definition: lsyscache.c:265
PresortedKeyData * presorted_keys
Definition: execnodes.h:2207
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
Oid * sortOperators
Definition: plannodes.h:812
PlanState ps
Definition: execnodes.h:1373
#define ERROR
Definition: elog.h:46
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
FmgrInfo flinfo
Definition: execnodes.h:2120
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
Definition: fmgr.c:136
void * palloc0(Size size)
Definition: mcxt.c:1093
FunctionCallInfo fcinfo
Definition: execnodes.h:2121
Plan * plan
Definition: execnodes.h:964
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1256
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
AttrNumber * sortColIdx
Definition: plannodes.h:811
void * palloc(Size size)
Definition: mcxt.c:1062
#define elog(elevel,...)
Definition: elog.h:232
int i
Oid * collations
Definition: plannodes.h:813

◆ switchToPresortedPrefixMode()

static void switchToPresortedPrefixMode ( PlanState pstate)
static

Definition at line 287 of file nodeIncrementalSort.c.

References IncrementalSortState::bound, IncrementalSortState::bound_Done, IncrementalSortState::bounded, castNode, Sort::collations, EState::es_direction, ExecClearTuple(), ExecCopySlot(), ExecGetResultType(), IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADPREFIXSORT, INCSORT_READPREFIXSORT, INSTRUMENT_SORT_GROUP, INT64_FORMAT, isCurrentGroup(), Min, IncrementalSortState::n_fullsort_remaining, IncrementalSort::nPresortedCols, Sort::nullsFirst, Sort::numCols, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, ScanState::ps, ScanDirectionIsForward, SO1_printf, SO2_printf, SO_printf, IncrementalSort::sort, Sort::sortColIdx, Sort::sortOperators, IncrementalSortState::ss, PlanState::state, IncrementalSortState::transfer_tuple, TupIsNull, tuplesort_begin_heap(), tuplesort_gettupleslot(), tuplesort_performsort(), tuplesort_puttupleslot(), tuplesort_reset(), tuplesort_set_bound(), and work_mem.

Referenced by ExecIncrementalSort().

288 {
290  ScanDirection dir;
291  int64 nTuples;
292  TupleDesc tupDesc;
293  PlanState *outerNode;
294  IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
295 
296  dir = node->ss.ps.state->es_direction;
297  outerNode = outerPlanState(node);
298  tupDesc = ExecGetResultType(outerNode);
299 
300  /* Configure the prefix sort state the first time around. */
301  if (node->prefixsort_state == NULL)
302  {
303  Tuplesortstate *prefixsort_state;
304  int nPresortedCols = plannode->nPresortedCols;
305 
306  /*
307  * Optimize the sort by assuming the prefix columns are all equal and
308  * thus we only need to sort by any remaining columns.
309  */
310  prefixsort_state = tuplesort_begin_heap(tupDesc,
311  plannode->sort.numCols - nPresortedCols,
312  &(plannode->sort.sortColIdx[nPresortedCols]),
313  &(plannode->sort.sortOperators[nPresortedCols]),
314  &(plannode->sort.collations[nPresortedCols]),
315  &(plannode->sort.nullsFirst[nPresortedCols]),
316  work_mem,
317  NULL,
318  false);
319  node->prefixsort_state = prefixsort_state;
320  }
321  else
322  {
323  /* Next group of presorted data */
325  }
326 
327  /*
328  * If the current node has a bound, then it's reasonably likely that a
329  * large prefix key group will benefit from bounded sort, so configure the
330  * tuplesort to allow for that optimization.
331  */
332  if (node->bounded)
333  {
334  SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n",
335  node->bound - node->bound_Done);
337  node->bound - node->bound_Done);
338  }
339 
340  /*
341  * Copy as many tuples as we can (i.e., in the same prefix key group) from
342  * the full sort state to the prefix sort state.
343  */
344  for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++)
345  {
346  /*
347  * When we encounter multiple prefix key groups inside the full sort
348  * tuplesort we have to carry over the last read tuple into the next
349  * batch.
350  */
351  if (nTuples == 0 && !TupIsNull(node->transfer_tuple))
352  {
354  /* The carried over tuple is our new group pivot tuple. */
356  }
357  else
358  {
361  false, node->transfer_tuple, NULL);
362 
363  /*
364  * If this is our first time through the loop, then we need to
365  * save the first tuple we get as our new group pivot.
366  */
367  if (TupIsNull(node->group_pivot))
369 
370  if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
371  {
373  }
374  else
375  {
376  /*
377  * The tuple isn't part of the current batch so we need to
378  * carry it over into the next batch of tuples we transfer out
379  * of the full sort tuplesort into the presorted prefix
380  * tuplesort. We don't actually have to do anything special to
381  * save the tuple since we've already loaded it into the
382  * node->transfer_tuple slot, and, even though that slot
383  * points to memory inside the full sort tuplesort, we can't
384  * reset that tuplesort anyway until we've fully transferred
385  * out its tuples, so this reference is safe. We do need to
386  * reset the group pivot tuple though since we've finished the
387  * current prefix key group.
388  */
390 
391  /* Break out of for-loop early */
392  break;
393  }
394  }
395  }
396 
397  /*
398  * Track how many tuples remain in the full sort batch so that we know if
399  * we need to sort multiple prefix key groups before processing tuples
400  * remaining in the large single prefix key group we think we've
401  * encountered.
402  */
403  SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);
404  node->n_fullsort_remaining -= nTuples;
405  SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining);
406 
407  if (node->n_fullsort_remaining == 0)
408  {
409  /*
410  * We've found that all tuples remaining in the full sort batch are in
411  * the same prefix key group and moved all of those tuples into the
412  * presorted prefix tuplesort. We don't know that we've yet found the
413  * last tuple in the current prefix key group, so save our pivot
414  * comparison tuple and continue fetching tuples from the outer
415  * execution node to load into the presorted prefix tuplesort.
416  */
418  SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
420 
421  /*
422  * Make sure we clear the transfer tuple slot so that next time we
423  * encounter a large prefix key group we don't incorrectly assume we
424  * have a tuple carried over from the previous group.
425  */
427  }
428  else
429  {
430  /*
431  * We finished a group but didn't consume all of the tuples from the
432  * full sort state, so we'll sort this batch, let the outer node read
433  * out all of those tuples, and then come back around to find another
434  * batch.
435  */
436  SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
438 
439  INSTRUMENT_SORT_GROUP(node, prefixsort);
440 
441  if (node->bounded)
442  {
443  /*
444  * If the current node has a bound and we've already sorted n
445  * tuples, then the functional bound remaining is (original bound
446  * - n), so store the current number of processed tuples for use
447  * in configuring sorting bound.
448  */
449  SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
450  Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
451  node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
452  }
453 
454  SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
456  }
457 }
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2040
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define ScanDirectionIsForward(direction)
Definition: sdir.h:55
#define castNode(_type_, nodeptr)
Definition: nodes.h:608
#define Min(x, y)
Definition: c.h:986
bool * nullsFirst
Definition: plannodes.h:814
EState * state
Definition: execnodes.h:966
#define SO1_printf(s, p)
Definition: execdebug.h:93
Oid * sortOperators
Definition: plannodes.h:812
ScanDirection es_direction
Definition: execnodes.h:555
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:897
PlanState ps
Definition: execnodes.h:1373
#define SO_printf(s)
Definition: execdebug.h:92
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1532
#define outerPlanState(node)
Definition: execnodes.h:1058
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2408
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:1334
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
ScanDirection
Definition: sdir.h:22
#define TupIsNull(slot)
Definition: tuptable.h:292
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2202
int numCols
Definition: plannodes.h:810
#define INSTRUMENT_SORT_GROUP(node, groupName)
#define SO2_printf(s, p1, p2)
Definition: execdebug.h:94
int work_mem
Definition: globals.c:124
Tuplesortstate * fullsort_state
Definition: execnodes.h:2204
Plan * plan
Definition: execnodes.h:964
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:490
#define INT64_FORMAT
Definition: c.h:483
AttrNumber * sortColIdx
Definition: plannodes.h:811
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2205
TupleTableSlot * group_pivot
Definition: execnodes.h:2212
Oid * collations
Definition: plannodes.h:813
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2213
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
Definition: tuplesort.c:1684