PostgreSQL Source Code  git master
nodeIncrementalSort.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "executor/execdebug.h"
#include "executor/nodeIncrementalSort.h"
#include "miscadmin.h"
#include "utils/lsyscache.h"
#include "utils/tuplesort.h"
Include dependency graph for nodeIncrementalSort.c:

Go to the source code of this file.

Macros

#define INSTRUMENT_SORT_GROUP(node, groupName)
 
#define DEFAULT_MIN_GROUP_SIZE   32
 
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE   (2 * DEFAULT_MIN_GROUP_SIZE)
 

Functions

static void instrumentSortedGroup (IncrementalSortGroupInfo *groupInfo, Tuplesortstate *sortState)
 
static void preparePresortedCols (IncrementalSortState *node)
 
static bool isCurrentGroup (IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
 
static void switchToPresortedPrefixMode (PlanState *pstate)
 
static TupleTableSlotExecIncrementalSort (PlanState *pstate)
 
IncrementalSortStateExecInitIncrementalSort (IncrementalSort *node, EState *estate, int eflags)
 
void ExecEndIncrementalSort (IncrementalSortState *node)
 
void ExecReScanIncrementalSort (IncrementalSortState *node)
 
void ExecIncrementalSortEstimate (IncrementalSortState *node, ParallelContext *pcxt)
 
void ExecIncrementalSortInitializeDSM (IncrementalSortState *node, ParallelContext *pcxt)
 
void ExecIncrementalSortInitializeWorker (IncrementalSortState *node, ParallelWorkerContext *pwcxt)
 
void ExecIncrementalSortRetrieveInstrumentation (IncrementalSortState *node)
 

Macro Definition Documentation

◆ DEFAULT_MAX_FULL_SORT_GROUP_SIZE

#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE   (2 * DEFAULT_MIN_GROUP_SIZE)

Definition at line 496 of file nodeIncrementalSort.c.

Referenced by ExecIncrementalSort().

◆ DEFAULT_MIN_GROUP_SIZE

#define DEFAULT_MIN_GROUP_SIZE   32

Definition at line 484 of file nodeIncrementalSort.c.

Referenced by ExecIncrementalSort().

◆ INSTRUMENT_SORT_GROUP

#define INSTRUMENT_SORT_GROUP (   node,
  groupName 
)
Value:
do { \
if ((node)->ss.ps.instrument != NULL) \
{ \
if ((node)->shared_info && (node)->am_worker) \
{ \
Assert(IsParallelWorker()); \
Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
(node)->groupName##_state); \
} \
else \
{ \
instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
(node)->groupName##_state); \
} \
} \
} while (0)
int ParallelWorkerNumber
Definition: parallel.c:112
#define IsParallelWorker()
Definition: parallel.h:61

Definition at line 99 of file nodeIncrementalSort.c.

Referenced by ExecIncrementalSort(), and switchToPresortedPrefixMode().

Function Documentation

◆ ExecEndIncrementalSort()

void ExecEndIncrementalSort ( IncrementalSortState node)

Definition at line 1092 of file nodeIncrementalSort.c.

References ExecClearTuple(), ExecDropSingleTupleTableSlot(), ExecEndNode(), IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, outerPlanState, IncrementalSortState::prefixsort_state, ScanState::ps, PlanState::ps_ResultTupleSlot, SO_printf, IncrementalSortState::ss, ScanState::ss_ScanTupleSlot, IncrementalSortState::transfer_tuple, and tuplesort_end().

Referenced by ExecEndNode().

1093 {
1094  SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1095 
1096  /* clean out the scan tuple */
1098  /* must drop pointer to sort result tuple */
1100  /* must drop stanalone tuple slots from outer node */
1103 
1104  /*
1105  * Release tuplesort resources.
1106  */
1107  if (node->fullsort_state != NULL)
1108  {
1110  node->fullsort_state = NULL;
1111  }
1112  if (node->prefixsort_state != NULL)
1113  {
1115  node->prefixsort_state = NULL;
1116  }
1117 
1118  /*
1119  * Shut down the subplan.
1120  */
1121  ExecEndNode(outerPlanState(node));
1122 
1123  SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1124 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:543
TupleTableSlot * ss_ScanTupleSlot
Definition: execnodes.h:1331
PlanState ps
Definition: execnodes.h:1328
#define SO_printf(s)
Definition: execdebug.h:92
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:977
#define outerPlanState(node)
Definition: execnodes.h:1033
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
Tuplesortstate * fullsort_state
Definition: execnodes.h:2076
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2077
TupleTableSlot * group_pivot
Definition: execnodes.h:2084
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:1445
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2085

◆ ExecIncrementalSort()

static TupleTableSlot* ExecIncrementalSort ( PlanState pstate)
static

Definition at line 512 of file nodeIncrementalSort.c.

References Assert, IncrementalSortState::bound, IncrementalSortState::bound_Done, IncrementalSortState::bounded, castNode, CHECK_FOR_INTERRUPTS, Sort::collations, DEFAULT_MAX_FULL_SORT_GROUP_SIZE, DEFAULT_MIN_GROUP_SIZE, EState::es_direction, ExecClearTuple(), ExecCopySlot(), ExecGetResultType(), ExecProcNode(), IncrementalSortState::execution_status, ForwardScanDirection, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADFULLSORT, INCSORT_LOADPREFIXSORT, INCSORT_READFULLSORT, INCSORT_READPREFIXSORT, INSTRUMENT_SORT_GROUP, isCurrentGroup(), Min, IncrementalSortState::n_fullsort_remaining, Sort::nullsFirst, Sort::numCols, IncrementalSortState::outerNodeDone, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, preparePresortedCols(), ScanState::ps, PlanState::ps_ResultTupleSlot, ScanDirectionIsForward, SO1_printf, SO2_printf, SO_printf, IncrementalSort::sort, Sort::sortColIdx, Sort::sortOperators, IncrementalSortState::ss, PlanState::state, switchToPresortedPrefixMode(), TupIsNull, tuplesort_begin_heap(), tuplesort_gettupleslot(), tuplesort_performsort(), tuplesort_puttupleslot(), tuplesort_reset(), tuplesort_set_bound(), tuplesort_used_bound(), and work_mem.

Referenced by ExecInitIncrementalSort().

513 {
515  EState *estate;
516  ScanDirection dir;
517  Tuplesortstate *read_sortstate;
518  Tuplesortstate *fullsort_state;
519  TupleTableSlot *slot;
520  IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
521  PlanState *outerNode;
522  TupleDesc tupDesc;
523  int64 nTuples = 0;
524  int64 minGroupSize;
525 
527 
528  estate = node->ss.ps.state;
529  dir = estate->es_direction;
530  fullsort_state = node->fullsort_state;
531 
532  /*
533  * If a previous iteration has sorted a batch, then we need to check to
534  * see if there are any remaining tuples in that batch that we can return
535  * before moving on to other execution states.
536  */
539  {
540  /*
541  * Return next tuple from the current sorted group set if available.
542  */
543  read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
544  fullsort_state : node->prefixsort_state;
545  slot = node->ss.ps.ps_ResultTupleSlot;
546 
547  /*
548  * We have to populate the slot from the tuplesort before checking
549  * outerNodeDone because it will set the slot to NULL if no more
550  * tuples remain. If the tuplesort is empty, but we don't have any
551  * more tuples available for sort from the outer node, then
552  * outerNodeDone will have been set so we'll return that now-empty
553  * slot to the caller.
554  */
555  if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
556  false, slot, NULL) || node->outerNodeDone)
557 
558  /*
559  * Note: there isn't a good test case for the node->outerNodeDone
560  * check directly, but we need it for any plan where the outer
561  * node will fail when trying to fetch too many tuples.
562  */
563  return slot;
564  else if (node->n_fullsort_remaining > 0)
565  {
566  /*
567  * When we transition to presorted prefix mode, we might have
568  * accumulated at least one additional prefix key group in the
569  * full sort tuplesort. The first call to
570  * switchToPresortedPrefixMode() will have pulled the first one of
571  * those groups out, and we've returned those tuples to the parent
572  * node, but if at this point we still have tuples remaining in
573  * the full sort state (i.e., n_fullsort_remaining > 0), then we
574  * need to re-execute the prefix mode transition function to pull
575  * out the next prefix key group.
576  */
577  SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (%ld)\n",
578  node->n_fullsort_remaining);
580  }
581  else
582  {
583  /*
584  * If we don't have any sorted tuples to read and we're not
585  * currently transitioning into presorted prefix sort mode, then
586  * it's time to start the process all over again by building a new
587  * group in the full sort state.
588  */
589  SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
591  }
592  }
593 
594  /*
595  * Scan the subplan in the forward direction while creating the sorted
596  * data.
597  */
599 
600  outerNode = outerPlanState(node);
601  tupDesc = ExecGetResultType(outerNode);
602 
603  /* Load tuples into the full sort state. */
605  {
606  /*
607  * Initialize sorting structures.
608  */
609  if (fullsort_state == NULL)
610  {
611  /*
612  * Initialize presorted column support structures for
613  * isCurrentGroup(). It's correct to do this along with the
614  * initial initialization for the full sort state (and not for the
615  * prefix sort state) since we always load the full sort state
616  * first.
617  */
618  preparePresortedCols(node);
619 
620  /*
621  * Since we optimize small prefix key groups by accumulating a
622  * minimum number of tuples before sorting, we can't assume that a
623  * group of tuples all have the same prefix key values. Hence we
624  * setup the full sort tuplesort to sort by all requested sort
625  * keys.
626  */
627  fullsort_state = tuplesort_begin_heap(tupDesc,
628  plannode->sort.numCols,
629  plannode->sort.sortColIdx,
630  plannode->sort.sortOperators,
631  plannode->sort.collations,
632  plannode->sort.nullsFirst,
633  work_mem,
634  NULL,
635  false);
636  node->fullsort_state = fullsort_state;
637  }
638  else
639  {
640  /* Reset sort for the next batch. */
641  tuplesort_reset(fullsort_state);
642  }
643 
644  /*
645  * Calculate the remaining tuples left if bounded and configure both
646  * bounded sort and the minimum group size accordingly.
647  */
648  if (node->bounded)
649  {
650  int64 currentBound = node->bound - node->bound_Done;
651 
652  /*
653  * Bounded sort isn't likely to be a useful optimization for full
654  * sort mode since we limit full sort mode to a relatively small
655  * number of tuples and tuplesort doesn't switch over to top-n
656  * heap sort anyway unless it hits (2 * bound) tuples.
657  */
658  if (currentBound < DEFAULT_MIN_GROUP_SIZE)
659  tuplesort_set_bound(fullsort_state, currentBound);
660 
661  minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
662  }
663  else
664  minGroupSize = DEFAULT_MIN_GROUP_SIZE;
665 
666  /*
667  * Because we have to read the next tuple to find out that we've
668  * encountered a new prefix key group, on subsequent groups we have to
669  * carry over that extra tuple and add it to the new group's sort here
670  * before we read any new tuples from the outer node.
671  */
672  if (!TupIsNull(node->group_pivot))
673  {
674  tuplesort_puttupleslot(fullsort_state, node->group_pivot);
675  nTuples++;
676 
677  /*
678  * We're in full sort mode accumulating a minimum number of tuples
679  * and not checking for prefix key equality yet, so we can't
680  * assume the group pivot tuple will reamin the same -- unless
681  * we're using a minimum group size of 1, in which case the pivot
682  * is obviously still the pviot.
683  */
684  if (nTuples != minGroupSize)
686  }
687 
688 
689  /*
690  * Pull as many tuples from the outer node as possible given our
691  * current operating mode.
692  */
693  for (;;)
694  {
695  slot = ExecProcNode(outerNode);
696 
697  /*
698  * If the outer node can't provide us any more tuples, then we can
699  * sort the current group and return those tuples.
700  */
701  if (TupIsNull(slot))
702  {
703  /*
704  * We need to know later if the outer node has completed to be
705  * able to distinguish between being done with a batch and
706  * being done with the whole node.
707  */
708  node->outerNodeDone = true;
709 
710  SO1_printf("Sorting fullsort with %ld tuples\n", nTuples);
711  tuplesort_performsort(fullsort_state);
712 
713  INSTRUMENT_SORT_GROUP(node, fullsort);
714 
715  SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
717  break;
718  }
719 
720  /* Accumulate the next group of presorted tuples. */
721  if (nTuples < minGroupSize)
722  {
723  /*
724  * If we haven't yet hit our target minimum group size, then
725  * we don't need to bother checking for inclusion in the
726  * current prefix group since at this point we'll assume that
727  * we'll full sort this batch to avoid a large number of very
728  * tiny (and thus inefficient) sorts.
729  */
730  tuplesort_puttupleslot(fullsort_state, slot);
731  nTuples++;
732 
733  /*
734  * If we've reached our minimum group size, then we need to
735  * store the most recent tuple as a pivot.
736  */
737  if (nTuples == minGroupSize)
738  ExecCopySlot(node->group_pivot, slot);
739  }
740  else
741  {
742  /*
743  * If we've already accumulated enough tuples to reach our
744  * minimum group size, then we need to compare any additional
745  * tuples to our pivot tuple to see if we reach the end of
746  * that prefix key group. Only after we find changed prefix
747  * keys can we guarantee sort stability of the tuples we've
748  * already accumulated.
749  */
750  if (isCurrentGroup(node, node->group_pivot, slot))
751  {
752  /*
753  * As long as the prefix keys match the pivot tuple then
754  * load the tuple into the tuplesort.
755  */
756  tuplesort_puttupleslot(fullsort_state, slot);
757  nTuples++;
758  }
759  else
760  {
761  /*
762  * Since the tuple we fetched isn't part of the current
763  * prefix key group we don't want to sort it as part of
764  * the current batch. Instead we use the group_pivot slot
765  * to carry it over to the next batch (even though we
766  * won't actually treat it as a group pivot).
767  */
768  ExecCopySlot(node->group_pivot, slot);
769 
770  if (node->bounded)
771  {
772  /*
773  * If the current node has a bound, and we've already
774  * sorted n tuples, then the functional bound
775  * remaining is (original bound - n), so store the
776  * current number of processed tuples for later use
777  * configuring the sort state's bound.
778  */
779  SO2_printf("Changing bound_Done from %ld to %ld\n",
780  node->bound_Done,
781  Min(node->bound, node->bound_Done + nTuples));
782  node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
783  }
784 
785  /*
786  * Once we find changed prefix keys we can complete the
787  * sort and transition modes to reading out the sorted
788  * tuples.
789  */
790  SO1_printf("Sorting fullsort tuplesort with %ld tuples\n",
791  nTuples);
792  tuplesort_performsort(fullsort_state);
793 
794  INSTRUMENT_SORT_GROUP(node, fullsort);
795 
796  SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
798  break;
799  }
800  }
801 
802  /*
803  * Unless we've already transitioned modes to reading from the
804  * full sort state, then we assume that having read at least
805  * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
806  * processing a large group of tuples all having equal prefix keys
807  * (but haven't yet found the final tuple in that prefix key
808  * group), so we need to transition into presorted prefix mode.
809  */
810  if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
812  {
813  /*
814  * The group pivot we have stored has already been put into
815  * the tuplesort; we don't want to carry it over. Since we
816  * haven't yet found the end of the prefix key group, it might
817  * seem like we should keep this, but we don't actually know
818  * how many prefix key groups might be represented in the full
819  * sort state, so we'll let the mode transition function
820  * manage this state for us.
821  */
823 
824  /*
825  * Unfortunately the tuplesort API doesn't include a way to
826  * retrieve tuples unless a sort has been performed, so we
827  * perform the sort even though we could just as easily rely
828  * on FIFO retrieval semantics when transferring them to the
829  * presorted prefix tuplesort.
830  */
831  SO1_printf("Sorting fullsort tuplesort with %ld tuples\n", nTuples);
832  tuplesort_performsort(fullsort_state);
833 
834  INSTRUMENT_SORT_GROUP(node, fullsort);
835 
836  /*
837  * If the full sort tuplesort happened to switch into top-n
838  * heapsort mode then we will only be able to retrieve
839  * currentBound tuples (since the tuplesort will have only
840  * retained the top-n tuples). This is safe even though we
841  * haven't yet completed fetching the current prefix key group
842  * because the tuples we've "lost" already sorted "below" the
843  * retained ones, and we're already contractually guaranteed
844  * to not need any more than the currentBound tuples.
845  */
847  {
848  int64 currentBound = node->bound - node->bound_Done;
849 
850  SO2_printf("Read %ld tuples, but setting to %ld because we used bounded sort\n",
851  nTuples, Min(currentBound, nTuples));
852  nTuples = Min(currentBound, nTuples);
853  }
854 
855  SO1_printf("Setting n_fullsort_remaining to %ld and calling switchToPresortedPrefixMode()\n",
856  nTuples);
857 
858  /*
859  * We might have multiple prefix key groups in the full sort
860  * state, so the mode transition function needs to know that
861  * it needs to move from the fullsort to presorted prefix
862  * sort.
863  */
864  node->n_fullsort_remaining = nTuples;
865 
866  /* Transition the tuples to the presorted prefix tuplesort. */
868 
869  /*
870  * Since we know we had tuples to move to the presorted prefix
871  * tuplesort, we know that unless that transition has verified
872  * that all tuples belonged to the same prefix key group (in
873  * which case we can go straight to continuing to load tuples
874  * into that tuplesort), we should have a tuple to return
875  * here.
876  *
877  * Either way, the appropriate execution status should have
878  * been set by switchToPresortedPrefixMode(), so we can drop
879  * out of the loop here and let the appropriate path kick in.
880  */
881  break;
882  }
883  }
884  }
885 
887  {
888  /*
889  * We only enter this state after the mode transition function has
890  * confirmed all remaining tuples from the full sort state have the
891  * same prefix and moved those tuples to the prefix sort state. That
892  * function has also set a group pivot tuple (which doesn't need to be
893  * carried over; it's already been put into the prefix sort state).
894  */
895  Assert(!TupIsNull(node->group_pivot));
896 
897  /*
898  * Read tuples from the outer node and load them into the prefix sort
899  * state until we encounter a tuple whose prefix keys don't match the
900  * current group_pivot tuple, since we can't guarantee sort stability
901  * until we have all tuples matching those prefix keys.
902  */
903  for (;;)
904  {
905  slot = ExecProcNode(outerNode);
906 
907  /*
908  * If we've exhausted tuples from the outer node we're done
909  * loading the prefix sort state.
910  */
911  if (TupIsNull(slot))
912  {
913  /*
914  * We need to know later if the outer node has completed to be
915  * able to distinguish between being done with a batch and
916  * being done with the whole node.
917  */
918  node->outerNodeDone = true;
919  break;
920  }
921 
922  /*
923  * If the tuple's prefix keys match our pivot tuple, we're not
924  * done yet and can load it into the prefix sort state. If not, we
925  * don't want to sort it as part of the current batch. Instead we
926  * use the group_pivot slot to carry it over to the next batch
927  * (even though we won't actually treat it as a group pivot).
928  */
929  if (isCurrentGroup(node, node->group_pivot, slot))
930  {
932  nTuples++;
933  }
934  else
935  {
936  ExecCopySlot(node->group_pivot, slot);
937  break;
938  }
939  }
940 
941  /*
942  * Perform the sort and begin returning the tuples to the parent plan
943  * node.
944  */
945  SO1_printf("Sorting presorted prefix tuplesort with >= %ld tuples\n", nTuples);
947 
948  INSTRUMENT_SORT_GROUP(node, prefixsort);
949 
950  SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
952 
953  if (node->bounded)
954  {
955  /*
956  * If the current node has a bound, and we've already sorted n
957  * tuples, then the functional bound remaining is (original bound
958  * - n), so store the current number of processed tuples for use
959  * in configuring sorting bound.
960  */
961  SO2_printf("Changing bound_Done from %ld to %ld\n",
962  node->bound_Done,
963  Min(node->bound, node->bound_Done + nTuples));
964  node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
965  }
966  }
967 
968  /* Restore to user specified direction. */
969  estate->es_direction = dir;
970 
971  /*
972  * Get the first or next tuple from tuplesort. Returns NULL if no more
973  * tuples.
974  */
975  read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
976  fullsort_state : node->prefixsort_state;
977  slot = node->ss.ps.ps_ResultTupleSlot;
978  (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
979  false, slot, NULL);
980  return slot;
981 }
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:1361
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2021
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define ScanDirectionIsForward(direction)
Definition: sdir.h:55
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
#define Min(x, y)
Definition: c.h:928
bool * nullsFirst
Definition: plannodes.h:774
EState * state
Definition: execnodes.h:941
#define SO1_printf(s, p)
Definition: execdebug.h:93
static void preparePresortedCols(IncrementalSortState *node)
Oid * sortOperators
Definition: plannodes.h:772
ScanDirection es_direction
Definition: execnodes.h:516
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:878
PlanState ps
Definition: execnodes.h:1328
#define SO_printf(s)
Definition: execdebug.h:92
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:977
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1513
#define outerPlanState(node)
Definition: execnodes.h:1033
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2389
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:1315
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
ScanDirection
Definition: sdir.h:22
#define TupIsNull(slot)
Definition: tuptable.h:292
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2074
int numCols
Definition: plannodes.h:770
#define INSTRUMENT_SORT_GROUP(node, groupName)
#define SO2_printf(s, p1, p2)
Definition: execdebug.h:94
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:242
int work_mem
Definition: globals.c:121
Tuplesortstate * fullsort_state
Definition: execnodes.h:2076
Plan * plan
Definition: execnodes.h:939
#define DEFAULT_MIN_GROUP_SIZE
#define Assert(condition)
Definition: c.h:746
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:489
static void switchToPresortedPrefixMode(PlanState *pstate)
AttrNumber * sortColIdx
Definition: plannodes.h:771
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2077
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
TupleTableSlot * group_pivot
Definition: execnodes.h:2084
Oid * collations
Definition: plannodes.h:773
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
Definition: tuplesort.c:1665

◆ ExecIncrementalSortEstimate()

void ExecIncrementalSortEstimate ( IncrementalSortState node,
ParallelContext pcxt 
)

Definition at line 1200 of file nodeIncrementalSort.c.

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, offsetof, ScanState::ps, shm_toc_estimate_chunk, shm_toc_estimate_keys, and IncrementalSortState::ss.

Referenced by ExecParallelEstimate().

1201 {
1202  Size size;
1203 
1204  /* don't need this if not instrumenting or no workers */
1205  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1206  return;
1207 
1208  size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1209  size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1210  shm_toc_estimate_chunk(&pcxt->estimator, size);
1211  shm_toc_estimate_keys(&pcxt->estimator, 1);
1212 }
Instrumentation * instrument
Definition: execnodes.h:949
shm_toc_estimator estimator
Definition: parallel.h:42
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
PlanState ps
Definition: execnodes.h:1328
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:474
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define offsetof(type, field)
Definition: c.h:669

◆ ExecIncrementalSortInitializeDSM()

void ExecIncrementalSortInitializeDSM ( IncrementalSortState node,
ParallelContext pcxt 
)

Definition at line 1221 of file nodeIncrementalSort.c.

References PlanState::instrument, SharedIncrementalSortInfo::num_workers, ParallelContext::nworkers, offsetof, PlanState::plan, Plan::plan_node_id, ScanState::ps, IncrementalSortState::shared_info, shm_toc_allocate(), shm_toc_insert(), IncrementalSortState::ss, and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

1222 {
1223  Size size;
1224 
1225  /* don't need this if not instrumenting or no workers */
1226  if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1227  return;
1228 
1229  size = offsetof(SharedIncrementalSortInfo, sinfo)
1230  + pcxt->nworkers * sizeof(IncrementalSortInfo);
1231  node->shared_info = shm_toc_allocate(pcxt->toc, size);
1232  /* ensure any unfilled slots will contain zeroes */
1233  memset(node->shared_info, 0, size);
1234  node->shared_info->num_workers = pcxt->nworkers;
1235  shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1236  node->shared_info);
1237 }
Instrumentation * instrument
Definition: execnodes.h:949
int plan_node_id
Definition: plannodes.h:135
PlanState ps
Definition: execnodes.h:1328
Plan * plan
Definition: execnodes.h:939
struct IncrementalSortInfo IncrementalSortInfo
size_t Size
Definition: c.h:474
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2087
#define offsetof(type, field)
Definition: c.h:669
shm_toc * toc
Definition: parallel.h:45

◆ ExecIncrementalSortInitializeWorker()

void ExecIncrementalSortInitializeWorker ( IncrementalSortState node,
ParallelWorkerContext pwcxt 
)

Definition at line 1246 of file nodeIncrementalSort.c.

References IncrementalSortState::am_worker, PlanState::plan, Plan::plan_node_id, ScanState::ps, IncrementalSortState::shared_info, shm_toc_lookup(), IncrementalSortState::ss, and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

1247 {
1248  node->shared_info =
1249  shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1250  node->am_worker = true;
1251 }
int plan_node_id
Definition: plannodes.h:135
PlanState ps
Definition: execnodes.h:1328
Plan * plan
Definition: execnodes.h:939
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2087
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

◆ ExecIncrementalSortRetrieveInstrumentation()

void ExecIncrementalSortRetrieveInstrumentation ( IncrementalSortState node)

Definition at line 1260 of file nodeIncrementalSort.c.

References SharedIncrementalSortInfo::num_workers, offsetof, palloc(), and IncrementalSortState::shared_info.

Referenced by ExecParallelRetrieveInstrumentation().

1261 {
1262  Size size;
1264 
1265  if (node->shared_info == NULL)
1266  return;
1267 
1268  size = offsetof(SharedIncrementalSortInfo, sinfo)
1269  + node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1270  si = palloc(size);
1271  memcpy(si, node->shared_info, size);
1272  node->shared_info = si;
1273 }
struct IncrementalSortInfo IncrementalSortInfo
size_t Size
Definition: c.h:474
void * palloc(Size size)
Definition: mcxt.c:950
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2087
#define offsetof(type, field)
Definition: c.h:669

◆ ExecInitIncrementalSort()

IncrementalSortState* ExecInitIncrementalSort ( IncrementalSort node,
EState estate,
int  eflags 
)

Definition at line 991 of file nodeIncrementalSort.c.

References Assert, IncrementalSortState::bound_Done, IncrementalSortState::bounded, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecCreateScanSlotFromOuterPlan(), ExecGetResultType(), ExecIncrementalSort(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortInfo::fullsortGroupInfo, IncrementalSortState::group_pivot, IncrementalSortGroupInfo::groupCount, IncrementalSortState::incsort_info, INCSORT_LOADFULLSORT, PlanState::instrument, makeNode, MakeSingleTupleTableSlot(), IncrementalSortGroupInfo::maxDiskSpaceUsed, IncrementalSortGroupInfo::maxMemorySpaceUsed, IncrementalSortState::n_fullsort_remaining, IncrementalSortState::outerNodeDone, outerPlan, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, IncrementalSortInfo::prefixsortGroupInfo, IncrementalSortState::presorted_keys, ScanState::ps, PlanState::ps_ProjInfo, SO_printf, IncrementalSortGroupInfo::sortMethods, IncrementalSortState::ss, PlanState::state, IncrementalSortGroupInfo::totalDiskSpaceUsed, IncrementalSortGroupInfo::totalMemorySpaceUsed, IncrementalSortState::transfer_tuple, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

992 {
993  IncrementalSortState *incrsortstate;
994 
995  SO_printf("ExecInitIncrementalSort: initializing sort node\n");
996 
997  /*
998  * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
999  * EXEC_FLAG_MARK, because the current sort state contains only one sort
1000  * batch rather than the full result set.
1001  */
1002  Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
1003 
1004  /* Initialize state structure. */
1005  incrsortstate = makeNode(IncrementalSortState);
1006  incrsortstate->ss.ps.plan = (Plan *) node;
1007  incrsortstate->ss.ps.state = estate;
1008  incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
1009 
1010  incrsortstate->execution_status = INCSORT_LOADFULLSORT;
1011  incrsortstate->bounded = false;
1012  incrsortstate->outerNodeDone = false;
1013  incrsortstate->bound_Done = 0;
1014  incrsortstate->fullsort_state = NULL;
1015  incrsortstate->prefixsort_state = NULL;
1016  incrsortstate->group_pivot = NULL;
1017  incrsortstate->transfer_tuple = NULL;
1018  incrsortstate->n_fullsort_remaining = 0;
1019  incrsortstate->presorted_keys = NULL;
1020 
1021  if (incrsortstate->ss.ps.instrument != NULL)
1022  {
1023  IncrementalSortGroupInfo *fullsortGroupInfo =
1024  &incrsortstate->incsort_info.fullsortGroupInfo;
1025  IncrementalSortGroupInfo *prefixsortGroupInfo =
1026  &incrsortstate->incsort_info.prefixsortGroupInfo;
1027 
1028  fullsortGroupInfo->groupCount = 0;
1029  fullsortGroupInfo->maxDiskSpaceUsed = 0;
1030  fullsortGroupInfo->totalDiskSpaceUsed = 0;
1031  fullsortGroupInfo->maxMemorySpaceUsed = 0;
1032  fullsortGroupInfo->totalMemorySpaceUsed = 0;
1033  fullsortGroupInfo->sortMethods = 0;
1034  prefixsortGroupInfo->groupCount = 0;
1035  prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1036  prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1037  prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1038  prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1039  prefixsortGroupInfo->sortMethods = 0;
1040  }
1041 
1042  /*
1043  * Miscellaneous initialization
1044  *
1045  * Sort nodes don't initialize their ExprContexts because they never call
1046  * ExecQual or ExecProject.
1047  */
1048 
1049  /*
1050  * Initialize child nodes.
1051  *
1052  * Incremental sort does not support backwards scans and mark/restore, so
1053  * we don't bother removing the flags from eflags here. We allow passing a
1054  * REWIND flag, because although incremental sort can't use it, the child
1055  * nodes may be able to do something more useful.
1056  */
1057  outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1058 
1059  /*
1060  * Initialize scan slot and type.
1061  */
1062  ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1063 
1064  /*
1065  * Initialize return slot and type. No need to initialize projection info
1066  * because we don't do any projections.
1067  */
1069  incrsortstate->ss.ps.ps_ProjInfo = NULL;
1070 
1071  /*
1072  * Initialize standalone slots to store a tuple for pivot prefix keys and
1073  * for carrying over a tuple from one batch to the next.
1074  */
1075  incrsortstate->group_pivot =
1078  incrsortstate->transfer_tuple =
1081 
1082  SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1083 
1084  return incrsortstate;
1085 }
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:979
Instrumentation * instrument
Definition: execnodes.h:949
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1208
IncrementalSortGroupInfo prefixsortGroupInfo
Definition: execnodes.h:2042
PresortedKeyData * presorted_keys
Definition: execnodes.h:2079
EState * state
Definition: execnodes.h:941
PlanState ps
Definition: execnodes.h:1328
#define SO_printf(s)
Definition: execdebug.h:92
IncrementalSortInfo incsort_info
Definition: execnodes.h:2081
#define EXEC_FLAG_BACKWARD
Definition: executor.h:58
#define outerPlanState(node)
Definition: execnodes.h:1033
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2074
static TupleTableSlot * ExecIncrementalSort(PlanState *pstate)
#define outerPlan(node)
Definition: plannodes.h:166
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:945
IncrementalSortGroupInfo fullsortGroupInfo
Definition: execnodes.h:2041
Tuplesortstate * fullsort_state
Definition: execnodes.h:2076
Plan * plan
Definition: execnodes.h:939
#define makeNode(_type_)
Definition: nodes.h:576
#define Assert(condition)
Definition: c.h:746
#define EXEC_FLAG_MARK
Definition: executor.h:59
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1769
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:489
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2077
TupleTableSlot * group_pivot
Definition: execnodes.h:2084
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
Definition: execUtils.c:681
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:139
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2085

◆ ExecReScanIncrementalSort()

void ExecReScanIncrementalSort ( IncrementalSortState node)

Definition at line 1127 of file nodeIncrementalSort.c.

References IncrementalSortState::bound_Done, PlanState::chgParam, ExecClearTuple(), ExecReScan(), IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADFULLSORT, IncrementalSortState::n_fullsort_remaining, IncrementalSortState::outerNodeDone, outerPlan, outerPlanState, IncrementalSortState::prefixsort_state, IncrementalSortState::presorted_keys, ScanState::ps, PlanState::ps_ResultTupleSlot, IncrementalSortState::ss, IncrementalSortState::transfer_tuple, and tuplesort_reset().

Referenced by ExecReScan().

1128 {
1130 
1131  /*
1132  * Incremental sort doesn't support efficient rescan even when parameters
1133  * haven't changed (e.g., rewind) because unlike regular sort we don't
1134  * store all tuples at once for the full sort.
1135  *
1136  * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1137  * re-execute the sort along with the child node. Incremental sort itself
1138  * can't do anything smarter, but maybe the child nodes can.
1139  *
1140  * In theory if we've only filled the full sort with one batch (and
1141  * haven't reset it for a new batch yet) then we could efficiently rewind,
1142  * but that seems a narrow enough case that it's not worth handling
1143  * specially at this time.
1144  */
1145 
1146  /* must drop pointer to sort result tuple */
1148 
1149  if (node->group_pivot != NULL)
1150  ExecClearTuple(node->group_pivot);
1151  if (node->transfer_tuple != NULL)
1153 
1154  node->outerNodeDone = false;
1155  node->n_fullsort_remaining = 0;
1156  node->bound_Done = 0;
1157  node->presorted_keys = NULL;
1158 
1160 
1161  /*
1162  * If we've set up either of the sort states yet, we need to reset them.
1163  * We could end them and null out the pointers, but there's no reason to
1164  * repay the setup cost, and because ExecIncrementalSort guards presorted
1165  * column functions by checking to see if the full sort state has been
1166  * initialized yet, setting the sort states to null here might actually
1167  * cause a leak.
1168  */
1169  if (node->fullsort_state != NULL)
1170  {
1172  node->fullsort_state = NULL;
1173  }
1174  if (node->prefixsort_state != NULL)
1175  {
1177  node->prefixsort_state = NULL;
1178  }
1179 
1180  /*
1181  * If chgParam of subnode is not null, theni the plan will be re-scanned
1182  * by the first ExecProcNode.
1183  */
1184  if (outerPlan->chgParam == NULL)
1185  ExecReScan(outerPlan);
1186 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
void ExecReScan(PlanState *node)
Definition: execAmi.c:76
PresortedKeyData * presorted_keys
Definition: execnodes.h:2079
PlanState ps
Definition: execnodes.h:1328
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:977
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1513
#define outerPlanState(node)
Definition: execnodes.h:1033
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2074
Bitmapset * chgParam
Definition: execnodes.h:971
#define outerPlan(node)
Definition: plannodes.h:166
Tuplesortstate * fullsort_state
Definition: execnodes.h:2076
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2077
TupleTableSlot * group_pivot
Definition: execnodes.h:2084
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2085

◆ instrumentSortedGroup()

static void instrumentSortedGroup ( IncrementalSortGroupInfo groupInfo,
Tuplesortstate sortState 
)
static

Definition at line 128 of file nodeIncrementalSort.c.

References IncrementalSortGroupInfo::groupCount, IncrementalSortGroupInfo::maxDiskSpaceUsed, IncrementalSortGroupInfo::maxMemorySpaceUsed, SORT_SPACE_TYPE_DISK, SORT_SPACE_TYPE_MEMORY, TuplesortInstrumentation::sortMethod, IncrementalSortGroupInfo::sortMethods, TuplesortInstrumentation::spaceType, TuplesortInstrumentation::spaceUsed, IncrementalSortGroupInfo::totalDiskSpaceUsed, IncrementalSortGroupInfo::totalMemorySpaceUsed, and tuplesort_get_stats().

130 {
131  TuplesortInstrumentation sort_instr;
132 
133  groupInfo->groupCount++;
134 
135  tuplesort_get_stats(sortState, &sort_instr);
136 
137  /* Calculate total and maximum memory and disk space used. */
138  switch (sort_instr.spaceType)
139  {
141  groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
142  if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
143  groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
144 
145  break;
147  groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
148  if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
149  groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
150 
151  break;
152  }
153 
154  /* Track each sort method we've used. */
155  groupInfo->sortMethods |= sort_instr.sortMethod;
156 }
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:3359
TuplesortMethod sortMethod
Definition: tuplesort.h:91
TuplesortSpaceType spaceType
Definition: tuplesort.h:92

◆ isCurrentGroup()

static bool isCurrentGroup ( IncrementalSortState node,
TupleTableSlot pivot,
TupleTableSlot tuple 
)
static

Definition at line 213 of file nodeIncrementalSort.c.

References FunctionCallInfoBaseData::args, PresortedKeyData::attno, castNode, DatumGetBool, elog, ERROR, PresortedKeyData::fcinfo, PresortedKeyData::flinfo, FmgrInfo::fn_oid, FunctionCallInvoke, i, FunctionCallInfoBaseData::isnull, sort-test::key, PlanState::plan, IncrementalSortState::presorted_keys, ScanState::ps, slot_getattr(), IncrementalSortState::ss, and NullableDatum::value.

Referenced by ExecIncrementalSort(), and switchToPresortedPrefixMode().

214 {
215  int nPresortedCols;
216 
217  nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
218 
219  /*
220  * That the input is sorted by keys * (0, ... n) implies that the tail
221  * keys are more likely to change. Therefore we do our comparison starting
222  * from the last pre-sorted column to optimize for early detection of
223  * inequality and minimizing the number of function calls..
224  */
225  for (int i = nPresortedCols - 1; i >= 0; i--)
226  {
227  Datum datumA,
228  datumB,
229  result;
230  bool isnullA,
231  isnullB;
232  AttrNumber attno = node->presorted_keys[i].attno;
234 
235  datumA = slot_getattr(pivot, attno, &isnullA);
236  datumB = slot_getattr(tuple, attno, &isnullB);
237 
238  /* Special case for NULL-vs-NULL, else use standard comparison */
239  if (isnullA || isnullB)
240  {
241  if (isnullA == isnullB)
242  continue;
243  else
244  return false;
245  }
246 
247  key = &node->presorted_keys[i];
248 
249  key->fcinfo->args[0].value = datumA;
250  key->fcinfo->args[1].value = datumB;
251 
252  /* just for paranoia's sake, we reset isnull each time */
253  key->fcinfo->isnull = false;
254 
255  result = FunctionCallInvoke(key->fcinfo);
256 
257  /* Check for null result, since caller is clearly not expecting one */
258  if (key->fcinfo->isnull)
259  elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
260 
261  if (!DatumGetBool(result))
262  return false;
263  }
264  return true;
265 }
OffsetNumber attno
Definition: execnodes.h:1994
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
PresortedKeyData * presorted_keys
Definition: execnodes.h:2079
PlanState ps
Definition: execnodes.h:1328
#define ERROR
Definition: elog.h:43
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
#define DatumGetBool(X)
Definition: postgres.h:393
FmgrInfo flinfo
Definition: execnodes.h:1992
Datum value
Definition: postgres.h:378
uintptr_t Datum
Definition: postgres.h:367
FunctionCallInfo fcinfo
Definition: execnodes.h:1993
Plan * plan
Definition: execnodes.h:939
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
Oid fn_oid
Definition: fmgr.h:59
#define elog(elevel,...)
Definition: elog.h:214
int i
int16 AttrNumber
Definition: attnum.h:21

◆ preparePresortedCols()

static void preparePresortedCols ( IncrementalSortState node)
static

Definition at line 165 of file nodeIncrementalSort.c.

References FunctionCallInfoBaseData::args, PresortedKeyData::attno, castNode, Sort::collations, CurrentMemoryContext, elog, ERROR, PresortedKeyData::fcinfo, PresortedKeyData::flinfo, fmgr_info_cxt(), get_equality_op_for_ordering_op(), get_opcode(), i, InitFunctionCallInfoData, NullableDatum::isnull, sort-test::key, IncrementalSort::nPresortedCols, OidIsValid, palloc(), palloc0(), PlanState::plan, IncrementalSortState::presorted_keys, ScanState::ps, SizeForFunctionCallInfo, IncrementalSort::sort, Sort::sortColIdx, Sort::sortOperators, and IncrementalSortState::ss.

Referenced by ExecIncrementalSort().

166 {
167  IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
168 
169  node->presorted_keys =
170  (PresortedKeyData *) palloc(plannode->nPresortedCols *
171  sizeof(PresortedKeyData));
172 
173  /* Pre-cache comparison functions for each pre-sorted key. */
174  for (int i = 0; i < plannode->nPresortedCols; i++)
175  {
176  Oid equalityOp,
177  equalityFunc;
179 
180  key = &node->presorted_keys[i];
181  key->attno = plannode->sort.sortColIdx[i];
182 
183  equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
184  NULL);
185  if (!OidIsValid(equalityOp))
186  elog(ERROR, "missing equality operator for ordering operator %u",
187  plannode->sort.sortOperators[i]);
188 
189  equalityFunc = get_opcode(equalityOp);
190  if (!OidIsValid(equalityFunc))
191  elog(ERROR, "missing function for operator %u", equalityOp);
192 
193  /* Lookup the comparison function */
194  fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
195 
196  /* We can initialize the callinfo just once and re-use it */
198  InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
199  plannode->sort.collations[i], NULL, NULL);
200  key->fcinfo->args[0].isnull = false;
201  key->fcinfo->args[1].isnull = false;
202  }
203 }
OffsetNumber attno
Definition: execnodes.h:1994
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
Oid get_equality_op_for_ordering_op(Oid opno, bool *reverse)
Definition: lsyscache.c:265
PresortedKeyData * presorted_keys
Definition: execnodes.h:2079
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:652
Oid * sortOperators
Definition: plannodes.h:772
PlanState ps
Definition: execnodes.h:1328
#define ERROR
Definition: elog.h:43
NullableDatum args[FLEXIBLE_ARRAY_MEMBER]
Definition: fmgr.h:95
FmgrInfo flinfo
Definition: execnodes.h:1992
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
Definition: fmgr.c:136
void * palloc0(Size size)
Definition: mcxt.c:981
FunctionCallInfo fcinfo
Definition: execnodes.h:1993
Plan * plan
Definition: execnodes.h:939
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1202
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
AttrNumber * sortColIdx
Definition: plannodes.h:771
void * palloc(Size size)
Definition: mcxt.c:950
#define elog(elevel,...)
Definition: elog.h:214
int i
Oid * collations
Definition: plannodes.h:773

◆ switchToPresortedPrefixMode()

static void switchToPresortedPrefixMode ( PlanState pstate)
static

Definition at line 287 of file nodeIncrementalSort.c.

References IncrementalSortState::bound, IncrementalSortState::bound_Done, IncrementalSortState::bounded, castNode, Sort::collations, EState::es_direction, ExecClearTuple(), ExecCopySlot(), ExecGetResultType(), IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADPREFIXSORT, INCSORT_READPREFIXSORT, INSTRUMENT_SORT_GROUP, isCurrentGroup(), Min, IncrementalSortState::n_fullsort_remaining, IncrementalSort::nPresortedCols, Sort::nullsFirst, Sort::numCols, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, ScanState::ps, ScanDirectionIsForward, SO1_printf, SO2_printf, SO_printf, IncrementalSort::sort, Sort::sortColIdx, Sort::sortOperators, IncrementalSortState::ss, PlanState::state, IncrementalSortState::transfer_tuple, TupIsNull, tuplesort_begin_heap(), tuplesort_gettupleslot(), tuplesort_performsort(), tuplesort_puttupleslot(), tuplesort_reset(), tuplesort_set_bound(), and work_mem.

Referenced by ExecIncrementalSort().

288 {
290  ScanDirection dir;
291  int64 nTuples = 0;
292  bool lastTuple = false;
293  bool firstTuple = true;
294  TupleDesc tupDesc;
295  PlanState *outerNode;
296  IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
297 
298  dir = node->ss.ps.state->es_direction;
299  outerNode = outerPlanState(node);
300  tupDesc = ExecGetResultType(outerNode);
301 
302  /* Configure the prefix sort state the first time around. */
303  if (node->prefixsort_state == NULL)
304  {
305  Tuplesortstate *prefixsort_state;
306  int nPresortedCols = plannode->nPresortedCols;
307 
308  /*
309  * Optimize the sort by assuming the prefix columns are all equal and
310  * thus we only need to sort by any remaining columns.
311  */
312  prefixsort_state = tuplesort_begin_heap(tupDesc,
313  plannode->sort.numCols - nPresortedCols,
314  &(plannode->sort.sortColIdx[nPresortedCols]),
315  &(plannode->sort.sortOperators[nPresortedCols]),
316  &(plannode->sort.collations[nPresortedCols]),
317  &(plannode->sort.nullsFirst[nPresortedCols]),
318  work_mem,
319  NULL,
320  false);
321  node->prefixsort_state = prefixsort_state;
322  }
323  else
324  {
325  /* Next group of presorted data */
327  }
328 
329  /*
330  * If the current node has a bound, then it's reasonably likely that a
331  * large prefix key group will benefit from bounded sort, so configure the
332  * tuplesort to allow for that optimization.
333  */
334  if (node->bounded)
335  {
336  SO1_printf("Setting bound on presorted prefix tuplesort to: %ld\n",
337  node->bound - node->bound_Done);
339  node->bound - node->bound_Done);
340  }
341 
342  /*
343  * Copy as many tuples as we can (i.e., in the same prefix key group) from
344  * the full sort state to the prefix sort state.
345  */
346  for (;;)
347  {
348  lastTuple = node->n_fullsort_remaining - nTuples == 1;
349 
350  /*
351  * When we encounter multiple prefix key groups inside the full sort
352  * tuplesort we have to carry over the last read tuple into the next
353  * batch.
354  */
355  if (firstTuple && !TupIsNull(node->transfer_tuple))
356  {
358  nTuples++;
359 
360  /* The carried over tuple is our new group pivot tuple. */
362  }
363  else
364  {
367  false, node->transfer_tuple, NULL);
368 
369  /*
370  * If this is our first time through the loop, then we need to
371  * save the first tuple we get as our new group pivot.
372  */
373  if (TupIsNull(node->group_pivot))
375 
376  if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
377  {
379  nTuples++;
380  }
381  else
382  {
383  /*
384  * The tuple isn't part of the current batch so we need to
385  * carry it over into the next batch of tuples we transfer out
386  * of the full sort tuplesort into the presorted prefix
387  * tuplesort. We don't actually have to do anything special to
388  * save the tuple since we've already loaded it into the
389  * node->transfer_tuple slot, and, even though that slot
390  * points to memory inside the full sort tuplesort, we can't
391  * reset that tuplesort anyway until we've fully transferred
392  * out its tuples, so this reference is safe. We do need to
393  * reset the group pivot tuple though since we've finished the
394  * current prefix key group.
395  */
397  break;
398  }
399  }
400 
401  firstTuple = false;
402 
403  /*
404  * If we've copied all of the tuples from the full sort state into the
405  * prefix sort state, then we don't actually know that we've yet found
406  * the last tuple in that prefix key group until we check the next
407  * tuple from the outer plan node, so we retain the current group
408  * pivot tuple prefix key group comparison.
409  */
410  if (lastTuple)
411  break;
412  }
413 
414  /*
415  * Track how many tuples remain in the full sort batch so that we know if
416  * we need to sort multiple prefix key groups before processing tuples
417  * remaining in the large single prefix key group we think we've
418  * encountered.
419  */
420  SO1_printf("Moving %ld tuples to presorted prefix tuplesort\n", nTuples);
421  node->n_fullsort_remaining -= nTuples;
422  SO1_printf("Setting n_fullsort_remaining to %ld\n", node->n_fullsort_remaining);
423 
424  if (lastTuple)
425  {
426  /*
427  * We've confirmed that all tuples remaining in the full sort batch is
428  * in the same prefix key group and moved all of those tuples into the
429  * presorted prefix tuplesort. Now we can save our pivot comparison
430  * tuple and continue fetching tuples from the outer execution node to
431  * load into the presorted prefix tuplesort.
432  */
434  SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
436 
437  /*
438  * Make sure we clear the transfer tuple slot so that next time we
439  * encounter a large prefix key group we don't incorrectly assume we
440  * have a tuple carried over from the previous group.
441  */
443  }
444  else
445  {
446  /*
447  * We finished a group but didn't consume all of the tuples from the
448  * full sort state, so we'll sort this batch, let the outer node read
449  * out all of those tuples, and then come back around to find another
450  * batch.
451  */
452  SO1_printf("Sorting presorted prefix tuplesort with %ld tuples\n", nTuples);
454 
455  INSTRUMENT_SORT_GROUP(node, prefixsort);
456 
457  if (node->bounded)
458  {
459  /*
460  * If the current node has a bound and we've already sorted n
461  * tuples, then the functional bound remaining is (original bound
462  * - n), so store the current number of processed tuples for use
463  * in configuring sorting bound.
464  */
465  SO2_printf("Changing bound_Done from %ld to %ld\n",
466  Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
467  node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
468  }
469 
470  SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
472  }
473 }
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:2021
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define ScanDirectionIsForward(direction)
Definition: sdir.h:55
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
#define Min(x, y)
Definition: c.h:928
bool * nullsFirst
Definition: plannodes.h:774
EState * state
Definition: execnodes.h:941
#define SO1_printf(s, p)
Definition: execdebug.h:93
Oid * sortOperators
Definition: plannodes.h:772
ScanDirection es_direction
Definition: execnodes.h:516
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, bool randomAccess)
Definition: tuplesort.c:878
PlanState ps
Definition: execnodes.h:1328
#define SO_printf(s)
Definition: execdebug.h:92
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1513
#define outerPlanState(node)
Definition: execnodes.h:1033
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Definition: tuplesort.c:2389
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:1315
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
ScanDirection
Definition: sdir.h:22
#define TupIsNull(slot)
Definition: tuptable.h:292
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2074
int numCols
Definition: plannodes.h:770
#define INSTRUMENT_SORT_GROUP(node, groupName)
#define SO2_printf(s, p1, p2)
Definition: execdebug.h:94
int work_mem
Definition: globals.c:121
Tuplesortstate * fullsort_state
Definition: execnodes.h:2076
Plan * plan
Definition: execnodes.h:939
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:489
AttrNumber * sortColIdx
Definition: plannodes.h:771
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2077
TupleTableSlot * group_pivot
Definition: execnodes.h:2084
Oid * collations
Definition: plannodes.h:773
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2085
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
Definition: tuplesort.c:1665