PostgreSQL Source Code git master
nodeIncrementalSort.c File Reference
#include "postgres.h"
#include "executor/execdebug.h"
#include "executor/nodeIncrementalSort.h"
#include "miscadmin.h"
#include "utils/lsyscache.h"
#include "utils/tuplesort.h"
Include dependency graph for nodeIncrementalSort.c:

Go to the source code of this file.

Macros

#define INSTRUMENT_SORT_GROUP(node, groupName)
 
#define DEFAULT_MIN_GROUP_SIZE   32
 
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE   (2 * DEFAULT_MIN_GROUP_SIZE)
 

Functions

static void instrumentSortedGroup (IncrementalSortGroupInfo *groupInfo, Tuplesortstate *sortState)
 
static void preparePresortedCols (IncrementalSortState *node)
 
static bool isCurrentGroup (IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
 
static void switchToPresortedPrefixMode (PlanState *pstate)
 
static TupleTableSlotExecIncrementalSort (PlanState *pstate)
 
IncrementalSortStateExecInitIncrementalSort (IncrementalSort *node, EState *estate, int eflags)
 
void ExecEndIncrementalSort (IncrementalSortState *node)
 
void ExecReScanIncrementalSort (IncrementalSortState *node)
 
void ExecIncrementalSortEstimate (IncrementalSortState *node, ParallelContext *pcxt)
 
void ExecIncrementalSortInitializeDSM (IncrementalSortState *node, ParallelContext *pcxt)
 
void ExecIncrementalSortInitializeWorker (IncrementalSortState *node, ParallelWorkerContext *pwcxt)
 
void ExecIncrementalSortRetrieveInstrumentation (IncrementalSortState *node)
 

Macro Definition Documentation

◆ DEFAULT_MAX_FULL_SORT_GROUP_SIZE

#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE   (2 * DEFAULT_MIN_GROUP_SIZE)

Definition at line 479 of file nodeIncrementalSort.c.

◆ DEFAULT_MIN_GROUP_SIZE

#define DEFAULT_MIN_GROUP_SIZE   32

Definition at line 467 of file nodeIncrementalSort.c.

◆ INSTRUMENT_SORT_GROUP

#define INSTRUMENT_SORT_GROUP (   node,
  groupName 
)
Value:
do { \
if ((node)->ss.ps.instrument != NULL) \
{ \
if ((node)->shared_info && (node)->am_worker) \
{ \
Assert(IsParallelWorker()); \
Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
(node)->groupName##_state); \
} \
else \
{ \
instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
(node)->groupName##_state); \
} \
} \
} while (0)
int ParallelWorkerNumber
Definition: parallel.c:114
#define IsParallelWorker()
Definition: parallel.h:60

Definition at line 98 of file nodeIncrementalSort.c.

Function Documentation

◆ ExecEndIncrementalSort()

void ExecEndIncrementalSort ( IncrementalSortState node)

Definition at line 1077 of file nodeIncrementalSort.c.

1078{
1079 SO_printf("ExecEndIncrementalSort: shutting down sort node\n");
1080
1083
1084 /*
1085 * Release tuplesort resources.
1086 */
1087 if (node->fullsort_state != NULL)
1088 {
1090 node->fullsort_state = NULL;
1091 }
1092 if (node->prefixsort_state != NULL)
1093 {
1095 node->prefixsort_state = NULL;
1096 }
1097
1098 /*
1099 * Shut down the subplan.
1100 */
1102
1103 SO_printf("ExecEndIncrementalSort: sort node shutdown\n");
1104}
void ExecEndNode(PlanState *node)
Definition: execProcnode.c:562
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
#define SO_printf(s)
Definition: execdebug.h:92
#define outerPlanState(node)
Definition: execnodes.h:1237
Tuplesortstate * prefixsort_state
Definition: execnodes.h:2443
TupleTableSlot * group_pivot
Definition: execnodes.h:2450
TupleTableSlot * transfer_tuple
Definition: execnodes.h:2451
Tuplesortstate * fullsort_state
Definition: execnodes.h:2442
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:951

References ExecDropSingleTupleTableSlot(), ExecEndNode(), IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, outerPlanState, IncrementalSortState::prefixsort_state, SO_printf, IncrementalSortState::transfer_tuple, and tuplesort_end().

Referenced by ExecEndNode().

◆ ExecIncrementalSort()

static TupleTableSlot * ExecIncrementalSort ( PlanState pstate)
static

Definition at line 495 of file nodeIncrementalSort.c.

496{
498 EState *estate;
499 ScanDirection dir;
500 Tuplesortstate *read_sortstate;
501 Tuplesortstate *fullsort_state;
502 TupleTableSlot *slot;
503 IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan;
504 PlanState *outerNode;
505 TupleDesc tupDesc;
506 int64 nTuples = 0;
507 int64 minGroupSize;
508
510
511 estate = node->ss.ps.state;
512 dir = estate->es_direction;
513 fullsort_state = node->fullsort_state;
514
515 /*
516 * If a previous iteration has sorted a batch, then we need to check to
517 * see if there are any remaining tuples in that batch that we can return
518 * before moving on to other execution states.
519 */
522 {
523 /*
524 * Return next tuple from the current sorted group set if available.
525 */
526 read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
527 fullsort_state : node->prefixsort_state;
528 slot = node->ss.ps.ps_ResultTupleSlot;
529
530 /*
531 * We have to populate the slot from the tuplesort before checking
532 * outerNodeDone because it will set the slot to NULL if no more
533 * tuples remain. If the tuplesort is empty, but we don't have any
534 * more tuples available for sort from the outer node, then
535 * outerNodeDone will have been set so we'll return that now-empty
536 * slot to the caller.
537 */
538 if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
539 false, slot, NULL) || node->outerNodeDone)
540
541 /*
542 * Note: there isn't a good test case for the node->outerNodeDone
543 * check directly, but we need it for any plan where the outer
544 * node will fail when trying to fetch too many tuples.
545 */
546 return slot;
547 else if (node->n_fullsort_remaining > 0)
548 {
549 /*
550 * When we transition to presorted prefix mode, we might have
551 * accumulated at least one additional prefix key group in the
552 * full sort tuplesort. The first call to
553 * switchToPresortedPrefixMode() will have pulled the first one of
554 * those groups out, and we've returned those tuples to the parent
555 * node, but if at this point we still have tuples remaining in
556 * the full sort state (i.e., n_fullsort_remaining > 0), then we
557 * need to re-execute the prefix mode transition function to pull
558 * out the next prefix key group.
559 */
560 SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
563 }
564 else
565 {
566 /*
567 * If we don't have any sorted tuples to read and we're not
568 * currently transitioning into presorted prefix sort mode, then
569 * it's time to start the process all over again by building a new
570 * group in the full sort state.
571 */
572 SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
574 }
575 }
576
577 /*
578 * Scan the subplan in the forward direction while creating the sorted
579 * data.
580 */
582
583 outerNode = outerPlanState(node);
584 tupDesc = ExecGetResultType(outerNode);
585
586 /* Load tuples into the full sort state. */
588 {
589 /*
590 * Initialize sorting structures.
591 */
592 if (fullsort_state == NULL)
593 {
594 /*
595 * Initialize presorted column support structures for
596 * isCurrentGroup(). It's correct to do this along with the
597 * initial initialization for the full sort state (and not for the
598 * prefix sort state) since we always load the full sort state
599 * first.
600 */
602
603 /*
604 * Since we optimize small prefix key groups by accumulating a
605 * minimum number of tuples before sorting, we can't assume that a
606 * group of tuples all have the same prefix key values. Hence we
607 * setup the full sort tuplesort to sort by all requested sort
608 * keys.
609 */
610 fullsort_state = tuplesort_begin_heap(tupDesc,
611 plannode->sort.numCols,
612 plannode->sort.sortColIdx,
613 plannode->sort.sortOperators,
614 plannode->sort.collations,
615 plannode->sort.nullsFirst,
616 work_mem,
617 NULL,
618 node->bounded ?
621 node->fullsort_state = fullsort_state;
622 }
623 else
624 {
625 /* Reset sort for the next batch. */
626 tuplesort_reset(fullsort_state);
627 }
628
629 /*
630 * Calculate the remaining tuples left if bounded and configure both
631 * bounded sort and the minimum group size accordingly.
632 */
633 if (node->bounded)
634 {
635 int64 currentBound = node->bound - node->bound_Done;
636
637 /*
638 * Bounded sort isn't likely to be a useful optimization for full
639 * sort mode since we limit full sort mode to a relatively small
640 * number of tuples and tuplesort doesn't switch over to top-n
641 * heap sort anyway unless it hits (2 * bound) tuples.
642 */
643 if (currentBound < DEFAULT_MIN_GROUP_SIZE)
644 tuplesort_set_bound(fullsort_state, currentBound);
645
646 minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound);
647 }
648 else
649 minGroupSize = DEFAULT_MIN_GROUP_SIZE;
650
651 /*
652 * Because we have to read the next tuple to find out that we've
653 * encountered a new prefix key group, on subsequent groups we have to
654 * carry over that extra tuple and add it to the new group's sort here
655 * before we read any new tuples from the outer node.
656 */
657 if (!TupIsNull(node->group_pivot))
658 {
659 tuplesort_puttupleslot(fullsort_state, node->group_pivot);
660 nTuples++;
661
662 /*
663 * We're in full sort mode accumulating a minimum number of tuples
664 * and not checking for prefix key equality yet, so we can't
665 * assume the group pivot tuple will remain the same -- unless
666 * we're using a minimum group size of 1, in which case the pivot
667 * is obviously still the pivot.
668 */
669 if (nTuples != minGroupSize)
671 }
672
673
674 /*
675 * Pull as many tuples from the outer node as possible given our
676 * current operating mode.
677 */
678 for (;;)
679 {
680 slot = ExecProcNode(outerNode);
681
682 /*
683 * If the outer node can't provide us any more tuples, then we can
684 * sort the current group and return those tuples.
685 */
686 if (TupIsNull(slot))
687 {
688 /*
689 * We need to know later if the outer node has completed to be
690 * able to distinguish between being done with a batch and
691 * being done with the whole node.
692 */
693 node->outerNodeDone = true;
694
695 SO1_printf("Sorting fullsort with " INT64_FORMAT " tuples\n", nTuples);
696 tuplesort_performsort(fullsort_state);
697
698 INSTRUMENT_SORT_GROUP(node, fullsort);
699
700 SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
702 break;
703 }
704
705 /* Accumulate the next group of presorted tuples. */
706 if (nTuples < minGroupSize)
707 {
708 /*
709 * If we haven't yet hit our target minimum group size, then
710 * we don't need to bother checking for inclusion in the
711 * current prefix group since at this point we'll assume that
712 * we'll full sort this batch to avoid a large number of very
713 * tiny (and thus inefficient) sorts.
714 */
715 tuplesort_puttupleslot(fullsort_state, slot);
716 nTuples++;
717
718 /*
719 * If we've reached our minimum group size, then we need to
720 * store the most recent tuple as a pivot.
721 */
722 if (nTuples == minGroupSize)
723 ExecCopySlot(node->group_pivot, slot);
724 }
725 else
726 {
727 /*
728 * If we've already accumulated enough tuples to reach our
729 * minimum group size, then we need to compare any additional
730 * tuples to our pivot tuple to see if we reach the end of
731 * that prefix key group. Only after we find changed prefix
732 * keys can we guarantee sort stability of the tuples we've
733 * already accumulated.
734 */
735 if (isCurrentGroup(node, node->group_pivot, slot))
736 {
737 /*
738 * As long as the prefix keys match the pivot tuple then
739 * load the tuple into the tuplesort.
740 */
741 tuplesort_puttupleslot(fullsort_state, slot);
742 nTuples++;
743 }
744 else
745 {
746 /*
747 * Since the tuple we fetched isn't part of the current
748 * prefix key group we don't want to sort it as part of
749 * the current batch. Instead we use the group_pivot slot
750 * to carry it over to the next batch (even though we
751 * won't actually treat it as a group pivot).
752 */
753 ExecCopySlot(node->group_pivot, slot);
754
755 if (node->bounded)
756 {
757 /*
758 * If the current node has a bound, and we've already
759 * sorted n tuples, then the functional bound
760 * remaining is (original bound - n), so store the
761 * current number of processed tuples for later use
762 * configuring the sort state's bound.
763 */
764 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
765 node->bound_Done,
766 Min(node->bound, node->bound_Done + nTuples));
767 node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
768 }
769
770 /*
771 * Once we find changed prefix keys we can complete the
772 * sort and transition modes to reading out the sorted
773 * tuples.
774 */
775 SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n",
776 nTuples);
777 tuplesort_performsort(fullsort_state);
778
779 INSTRUMENT_SORT_GROUP(node, fullsort);
780
781 SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
783 break;
784 }
785 }
786
787 /*
788 * Unless we've already transitioned modes to reading from the
789 * full sort state, then we assume that having read at least
790 * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're
791 * processing a large group of tuples all having equal prefix keys
792 * (but haven't yet found the final tuple in that prefix key
793 * group), so we need to transition into presorted prefix mode.
794 */
795 if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE &&
797 {
798 /*
799 * The group pivot we have stored has already been put into
800 * the tuplesort; we don't want to carry it over. Since we
801 * haven't yet found the end of the prefix key group, it might
802 * seem like we should keep this, but we don't actually know
803 * how many prefix key groups might be represented in the full
804 * sort state, so we'll let the mode transition function
805 * manage this state for us.
806 */
808
809 /*
810 * Unfortunately the tuplesort API doesn't include a way to
811 * retrieve tuples unless a sort has been performed, so we
812 * perform the sort even though we could just as easily rely
813 * on FIFO retrieval semantics when transferring them to the
814 * presorted prefix tuplesort.
815 */
816 SO1_printf("Sorting fullsort tuplesort with " INT64_FORMAT " tuples\n", nTuples);
817 tuplesort_performsort(fullsort_state);
818
819 INSTRUMENT_SORT_GROUP(node, fullsort);
820
821 /*
822 * If the full sort tuplesort happened to switch into top-n
823 * heapsort mode then we will only be able to retrieve
824 * currentBound tuples (since the tuplesort will have only
825 * retained the top-n tuples). This is safe even though we
826 * haven't yet completed fetching the current prefix key group
827 * because the tuples we've "lost" already sorted "below" the
828 * retained ones, and we're already contractually guaranteed
829 * to not need any more than the currentBound tuples.
830 */
832 {
833 int64 currentBound = node->bound - node->bound_Done;
834
835 SO2_printf("Read " INT64_FORMAT " tuples, but setting to " INT64_FORMAT " because we used bounded sort\n",
836 nTuples, Min(currentBound, nTuples));
837 nTuples = Min(currentBound, nTuples);
838 }
839
840 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
841 nTuples);
842
843 /*
844 * We might have multiple prefix key groups in the full sort
845 * state, so the mode transition function needs to know that
846 * it needs to move from the fullsort to presorted prefix
847 * sort.
848 */
849 node->n_fullsort_remaining = nTuples;
850
851 /* Transition the tuples to the presorted prefix tuplesort. */
853
854 /*
855 * Since we know we had tuples to move to the presorted prefix
856 * tuplesort, we know that unless that transition has verified
857 * that all tuples belonged to the same prefix key group (in
858 * which case we can go straight to continuing to load tuples
859 * into that tuplesort), we should have a tuple to return
860 * here.
861 *
862 * Either way, the appropriate execution status should have
863 * been set by switchToPresortedPrefixMode(), so we can drop
864 * out of the loop here and let the appropriate path kick in.
865 */
866 break;
867 }
868 }
869 }
870
872 {
873 /*
874 * We only enter this state after the mode transition function has
875 * confirmed all remaining tuples from the full sort state have the
876 * same prefix and moved those tuples to the prefix sort state. That
877 * function has also set a group pivot tuple (which doesn't need to be
878 * carried over; it's already been put into the prefix sort state).
879 */
881
882 /*
883 * Read tuples from the outer node and load them into the prefix sort
884 * state until we encounter a tuple whose prefix keys don't match the
885 * current group_pivot tuple, since we can't guarantee sort stability
886 * until we have all tuples matching those prefix keys.
887 */
888 for (;;)
889 {
890 slot = ExecProcNode(outerNode);
891
892 /*
893 * If we've exhausted tuples from the outer node we're done
894 * loading the prefix sort state.
895 */
896 if (TupIsNull(slot))
897 {
898 /*
899 * We need to know later if the outer node has completed to be
900 * able to distinguish between being done with a batch and
901 * being done with the whole node.
902 */
903 node->outerNodeDone = true;
904 break;
905 }
906
907 /*
908 * If the tuple's prefix keys match our pivot tuple, we're not
909 * done yet and can load it into the prefix sort state. If not, we
910 * don't want to sort it as part of the current batch. Instead we
911 * use the group_pivot slot to carry it over to the next batch
912 * (even though we won't actually treat it as a group pivot).
913 */
914 if (isCurrentGroup(node, node->group_pivot, slot))
915 {
917 nTuples++;
918 }
919 else
920 {
921 ExecCopySlot(node->group_pivot, slot);
922 break;
923 }
924 }
925
926 /*
927 * Perform the sort and begin returning the tuples to the parent plan
928 * node.
929 */
930 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
932
933 INSTRUMENT_SORT_GROUP(node, prefixsort);
934
935 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
937
938 if (node->bounded)
939 {
940 /*
941 * If the current node has a bound, and we've already sorted n
942 * tuples, then the functional bound remaining is (original bound
943 * - n), so store the current number of processed tuples for use
944 * in configuring sorting bound.
945 */
946 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
947 node->bound_Done,
948 Min(node->bound, node->bound_Done + nTuples));
949 node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
950 }
951 }
952
953 /* Restore to user specified direction. */
954 estate->es_direction = dir;
955
956 /*
957 * Get the first or next tuple from tuplesort. Returns NULL if no more
958 * tuples.
959 */
960 read_sortstate = node->execution_status == INCSORT_READFULLSORT ?
961 fullsort_state : node->prefixsort_state;
962 slot = node->ss.ps.ps_ResultTupleSlot;
963 (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir),
964 false, slot, NULL);
965 return slot;
966}
#define Min(x, y)
Definition: c.h:961
#define INT64_FORMAT
Definition: c.h:506
#define Assert(condition)
Definition: c.h:815
int64_t int64
Definition: c.h:485
TupleDesc ExecGetResultType(PlanState *planstate)
Definition: execUtils.c:495
#define SO2_printf(s, p1, p2)
Definition: execdebug.h:94
#define SO1_printf(s, p)
Definition: execdebug.h:93
@ INCSORT_READFULLSORT
Definition: execnodes.h:2429
@ INCSORT_LOADPREFIXSORT
Definition: execnodes.h:2428
@ INCSORT_READPREFIXSORT
Definition: execnodes.h:2430
@ INCSORT_LOADFULLSORT
Definition: execnodes.h:2427
static TupleTableSlot * ExecProcNode(PlanState *node)
Definition: executor.h:267
int work_mem
Definition: globals.c:130
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static void switchToPresortedPrefixMode(PlanState *pstate)
#define INSTRUMENT_SORT_GROUP(node, groupName)
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
static void preparePresortedCols(IncrementalSortState *node)
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE
#define DEFAULT_MIN_GROUP_SIZE
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
#define ScanDirectionIsForward(direction)
Definition: sdir.h:64
ScanDirection
Definition: sdir.h:25
@ ForwardScanDirection
Definition: sdir.h:28
ScanDirection es_direction
Definition: execnodes.h:647
IncrementalSortExecutionStatus execution_status
Definition: execnodes.h:2440
Plan * plan
Definition: execnodes.h:1141
EState * state
Definition: execnodes.h:1143
TupleTableSlot * ps_ResultTupleSlot
Definition: execnodes.h:1179
PlanState ps
Definition: execnodes.h:1588
int numCols
Definition: plannodes.h:939
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1363
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1019
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:886
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:838
#define TUPLESORT_NONE
Definition: tuplesort.h:93
#define TUPLESORT_ALLOWBOUNDED
Definition: tuplesort.h:99
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, int sortopt)
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
#define TupIsNull(slot)
Definition: tuptable.h:306
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509

References Assert, IncrementalSortState::bound, IncrementalSortState::bound_Done, IncrementalSortState::bounded, castNode, CHECK_FOR_INTERRUPTS, DEFAULT_MAX_FULL_SORT_GROUP_SIZE, DEFAULT_MIN_GROUP_SIZE, EState::es_direction, ExecClearTuple(), ExecCopySlot(), ExecGetResultType(), ExecProcNode(), IncrementalSortState::execution_status, ForwardScanDirection, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADFULLSORT, INCSORT_LOADPREFIXSORT, INCSORT_READFULLSORT, INCSORT_READPREFIXSORT, INSTRUMENT_SORT_GROUP, INT64_FORMAT, isCurrentGroup(), Min, IncrementalSortState::n_fullsort_remaining, Sort::numCols, IncrementalSortState::outerNodeDone, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, preparePresortedCols(), ScanState::ps, PlanState::ps_ResultTupleSlot, ScanDirectionIsForward, SO1_printf, SO2_printf, SO_printf, IncrementalSort::sort, IncrementalSortState::ss, PlanState::state, switchToPresortedPrefixMode(), TupIsNull, TUPLESORT_ALLOWBOUNDED, tuplesort_begin_heap(), tuplesort_gettupleslot(), TUPLESORT_NONE, tuplesort_performsort(), tuplesort_puttupleslot(), tuplesort_reset(), tuplesort_set_bound(), tuplesort_used_bound(), and work_mem.

Referenced by ExecInitIncrementalSort().

◆ ExecIncrementalSortEstimate()

void ExecIncrementalSortEstimate ( IncrementalSortState node,
ParallelContext pcxt 
)

Definition at line 1173 of file nodeIncrementalSort.c.

1174{
1175 Size size;
1176
1177 /* don't need this if not instrumenting or no workers */
1178 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1179 return;
1180
1181 size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
1182 size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
1185}
size_t Size
Definition: c.h:562
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607
shm_toc_estimator estimator
Definition: parallel.h:41
Instrumentation * instrument
Definition: execnodes.h:1151

References add_size(), ParallelContext::estimator, PlanState::instrument, mul_size(), ParallelContext::nworkers, ScanState::ps, shm_toc_estimate_chunk, shm_toc_estimate_keys, size, and IncrementalSortState::ss.

Referenced by ExecParallelEstimate().

◆ ExecIncrementalSortInitializeDSM()

void ExecIncrementalSortInitializeDSM ( IncrementalSortState node,
ParallelContext pcxt 
)

Definition at line 1194 of file nodeIncrementalSort.c.

1195{
1196 Size size;
1197
1198 /* don't need this if not instrumenting or no workers */
1199 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1200 return;
1201
1202 size = offsetof(SharedIncrementalSortInfo, sinfo)
1203 + pcxt->nworkers * sizeof(IncrementalSortInfo);
1204 node->shared_info = shm_toc_allocate(pcxt->toc, size);
1205 /* ensure any unfilled slots will contain zeroes */
1206 memset(node->shared_info, 0, size);
1207 node->shared_info->num_workers = pcxt->nworkers;
1208 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1209 node->shared_info);
1210}
struct IncrementalSortInfo IncrementalSortInfo
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
SharedIncrementalSortInfo * shared_info
Definition: execnodes.h:2453
shm_toc * toc
Definition: parallel.h:44
int plan_node_id
Definition: plannodes.h:152

References PlanState::instrument, SharedIncrementalSortInfo::num_workers, ParallelContext::nworkers, PlanState::plan, Plan::plan_node_id, ScanState::ps, IncrementalSortState::shared_info, shm_toc_allocate(), shm_toc_insert(), size, IncrementalSortState::ss, and ParallelContext::toc.

Referenced by ExecParallelInitializeDSM().

◆ ExecIncrementalSortInitializeWorker()

void ExecIncrementalSortInitializeWorker ( IncrementalSortState node,
ParallelWorkerContext pwcxt 
)

Definition at line 1219 of file nodeIncrementalSort.c.

1220{
1221 node->shared_info =
1222 shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1223 node->am_worker = true;
1224}
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232

References IncrementalSortState::am_worker, PlanState::plan, Plan::plan_node_id, ScanState::ps, IncrementalSortState::shared_info, shm_toc_lookup(), IncrementalSortState::ss, and ParallelWorkerContext::toc.

Referenced by ExecParallelInitializeWorker().

◆ ExecIncrementalSortRetrieveInstrumentation()

void ExecIncrementalSortRetrieveInstrumentation ( IncrementalSortState node)

Definition at line 1233 of file nodeIncrementalSort.c.

1234{
1235 Size size;
1237
1238 if (node->shared_info == NULL)
1239 return;
1240
1241 size = offsetof(SharedIncrementalSortInfo, sinfo)
1242 + node->shared_info->num_workers * sizeof(IncrementalSortInfo);
1243 si = palloc(size);
1244 memcpy(si, node->shared_info, size);
1245 node->shared_info = si;
1246}
void * palloc(Size size)
Definition: mcxt.c:1317

References SharedIncrementalSortInfo::num_workers, palloc(), IncrementalSortState::shared_info, and size.

Referenced by ExecParallelRetrieveInstrumentation().

◆ ExecInitIncrementalSort()

IncrementalSortState * ExecInitIncrementalSort ( IncrementalSort node,
EState estate,
int  eflags 
)

Definition at line 976 of file nodeIncrementalSort.c.

977{
978 IncrementalSortState *incrsortstate;
979
980 SO_printf("ExecInitIncrementalSort: initializing sort node\n");
981
982 /*
983 * Incremental sort can't be used with EXEC_FLAG_BACKWARD or
984 * EXEC_FLAG_MARK, because the current sort state contains only one sort
985 * batch rather than the full result set.
986 */
987 Assert((eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) == 0);
988
989 /* Initialize state structure. */
990 incrsortstate = makeNode(IncrementalSortState);
991 incrsortstate->ss.ps.plan = (Plan *) node;
992 incrsortstate->ss.ps.state = estate;
993 incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
994
995 incrsortstate->execution_status = INCSORT_LOADFULLSORT;
996 incrsortstate->bounded = false;
997 incrsortstate->outerNodeDone = false;
998 incrsortstate->bound_Done = 0;
999 incrsortstate->fullsort_state = NULL;
1000 incrsortstate->prefixsort_state = NULL;
1001 incrsortstate->group_pivot = NULL;
1002 incrsortstate->transfer_tuple = NULL;
1003 incrsortstate->n_fullsort_remaining = 0;
1004 incrsortstate->presorted_keys = NULL;
1005
1006 if (incrsortstate->ss.ps.instrument != NULL)
1007 {
1008 IncrementalSortGroupInfo *fullsortGroupInfo =
1009 &incrsortstate->incsort_info.fullsortGroupInfo;
1010 IncrementalSortGroupInfo *prefixsortGroupInfo =
1011 &incrsortstate->incsort_info.prefixsortGroupInfo;
1012
1013 fullsortGroupInfo->groupCount = 0;
1014 fullsortGroupInfo->maxDiskSpaceUsed = 0;
1015 fullsortGroupInfo->totalDiskSpaceUsed = 0;
1016 fullsortGroupInfo->maxMemorySpaceUsed = 0;
1017 fullsortGroupInfo->totalMemorySpaceUsed = 0;
1018 fullsortGroupInfo->sortMethods = 0;
1019 prefixsortGroupInfo->groupCount = 0;
1020 prefixsortGroupInfo->maxDiskSpaceUsed = 0;
1021 prefixsortGroupInfo->totalDiskSpaceUsed = 0;
1022 prefixsortGroupInfo->maxMemorySpaceUsed = 0;
1023 prefixsortGroupInfo->totalMemorySpaceUsed = 0;
1024 prefixsortGroupInfo->sortMethods = 0;
1025 }
1026
1027 /*
1028 * Miscellaneous initialization
1029 *
1030 * Sort nodes don't initialize their ExprContexts because they never call
1031 * ExecQual or ExecProject.
1032 */
1033
1034 /*
1035 * Initialize child nodes.
1036 *
1037 * Incremental sort does not support backwards scans and mark/restore, so
1038 * we don't bother removing the flags from eflags here. We allow passing a
1039 * REWIND flag, because although incremental sort can't use it, the child
1040 * nodes may be able to do something more useful.
1041 */
1042 outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
1043
1044 /*
1045 * Initialize scan slot and type.
1046 */
1047 ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple);
1048
1049 /*
1050 * Initialize return slot and type. No need to initialize projection info
1051 * because we don't do any projections.
1052 */
1054 incrsortstate->ss.ps.ps_ProjInfo = NULL;
1055
1056 /*
1057 * Initialize standalone slots to store a tuple for pivot prefix keys and
1058 * for carrying over a tuple from one batch to the next.
1059 */
1060 incrsortstate->group_pivot =
1063 incrsortstate->transfer_tuple =
1066
1067 SO_printf("ExecInitIncrementalSort: sort node initialized\n");
1068
1069 return incrsortstate;
1070}
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
Definition: execProcnode.c:142
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1425
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1986
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
Definition: execUtils.c:704
#define EXEC_FLAG_BACKWARD
Definition: executor.h:68
#define EXEC_FLAG_MARK
Definition: executor.h:69
static TupleTableSlot * ExecIncrementalSort(PlanState *pstate)
#define makeNode(_type_)
Definition: nodes.h:155
#define outerPlan(node)
Definition: plannodes.h:183
IncrementalSortGroupInfo prefixsortGroupInfo
Definition: execnodes.h:2408
IncrementalSortGroupInfo fullsortGroupInfo
Definition: execnodes.h:2407
PresortedKeyData * presorted_keys
Definition: execnodes.h:2445
IncrementalSortInfo incsort_info
Definition: execnodes.h:2447
ProjectionInfo * ps_ProjInfo
Definition: execnodes.h:1181
ExecProcNodeMtd ExecProcNode
Definition: execnodes.h:1147

References Assert, IncrementalSortState::bound_Done, IncrementalSortState::bounded, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecCreateScanSlotFromOuterPlan(), ExecGetResultType(), ExecIncrementalSort(), ExecInitNode(), ExecInitResultTupleSlotTL(), PlanState::ExecProcNode, IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortInfo::fullsortGroupInfo, IncrementalSortState::group_pivot, IncrementalSortGroupInfo::groupCount, IncrementalSortState::incsort_info, INCSORT_LOADFULLSORT, PlanState::instrument, makeNode, MakeSingleTupleTableSlot(), IncrementalSortGroupInfo::maxDiskSpaceUsed, IncrementalSortGroupInfo::maxMemorySpaceUsed, IncrementalSortState::n_fullsort_remaining, IncrementalSortState::outerNodeDone, outerPlan, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, IncrementalSortInfo::prefixsortGroupInfo, IncrementalSortState::presorted_keys, ScanState::ps, PlanState::ps_ProjInfo, SO_printf, IncrementalSortGroupInfo::sortMethods, IncrementalSortState::ss, PlanState::state, IncrementalSortGroupInfo::totalDiskSpaceUsed, IncrementalSortGroupInfo::totalMemorySpaceUsed, IncrementalSortState::transfer_tuple, and TTSOpsMinimalTuple.

Referenced by ExecInitNode().

◆ ExecReScanIncrementalSort()

void ExecReScanIncrementalSort ( IncrementalSortState node)

Definition at line 1107 of file nodeIncrementalSort.c.

1108{
1110
1111 /*
1112 * Incremental sort doesn't support efficient rescan even when parameters
1113 * haven't changed (e.g., rewind) because unlike regular sort we don't
1114 * store all tuples at once for the full sort.
1115 *
1116 * So even if EXEC_FLAG_REWIND is set we just reset all of our state and
1117 * re-execute the sort along with the child node. Incremental sort itself
1118 * can't do anything smarter, but maybe the child nodes can.
1119 *
1120 * In theory if we've only filled the full sort with one batch (and
1121 * haven't reset it for a new batch yet) then we could efficiently rewind,
1122 * but that seems a narrow enough case that it's not worth handling
1123 * specially at this time.
1124 */
1125
1126 /* must drop pointer to sort result tuple */
1128
1129 if (node->group_pivot != NULL)
1131 if (node->transfer_tuple != NULL)
1133
1134 node->outerNodeDone = false;
1135 node->n_fullsort_remaining = 0;
1136 node->bound_Done = 0;
1137
1139
1140 /*
1141 * If we've set up either of the sort states yet, we need to reset them.
1142 * We could end them and null out the pointers, but there's no reason to
1143 * repay the setup cost, and because ExecIncrementalSort guards presorted
1144 * column functions by checking to see if the full sort state has been
1145 * initialized yet, setting the sort states to null here might actually
1146 * cause a leak.
1147 */
1148 if (node->fullsort_state != NULL)
1150 if (node->prefixsort_state != NULL)
1152
1153 /*
1154 * If chgParam of subnode is not null, then the plan will be re-scanned by
1155 * the first ExecProcNode.
1156 */
1157 if (outerPlan->chgParam == NULL)
1159}
void ExecReScan(PlanState *node)
Definition: execAmi.c:76

References IncrementalSortState::bound_Done, ExecClearTuple(), ExecReScan(), IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADFULLSORT, IncrementalSortState::n_fullsort_remaining, IncrementalSortState::outerNodeDone, outerPlan, outerPlanState, IncrementalSortState::prefixsort_state, ScanState::ps, PlanState::ps_ResultTupleSlot, IncrementalSortState::ss, IncrementalSortState::transfer_tuple, and tuplesort_reset().

Referenced by ExecReScan().

◆ instrumentSortedGroup()

static void instrumentSortedGroup ( IncrementalSortGroupInfo groupInfo,
Tuplesortstate sortState 
)
static

Definition at line 127 of file nodeIncrementalSort.c.

129{
130 TuplesortInstrumentation sort_instr;
131
132 groupInfo->groupCount++;
133
134 tuplesort_get_stats(sortState, &sort_instr);
135
136 /* Calculate total and maximum memory and disk space used. */
137 switch (sort_instr.spaceType)
138 {
140 groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed;
141 if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed)
142 groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed;
143
144 break;
146 groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed;
147 if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed)
148 groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed;
149
150 break;
151 }
152
153 /* Track each sort method we've used. */
154 groupInfo->sortMethods |= sort_instr.sortMethod;
155}
TuplesortMethod sortMethod
Definition: tuplesort.h:112
TuplesortSpaceType spaceType
Definition: tuplesort.h:113
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:2499
@ SORT_SPACE_TYPE_DISK
Definition: tuplesort.h:88
@ SORT_SPACE_TYPE_MEMORY
Definition: tuplesort.h:89

References IncrementalSortGroupInfo::groupCount, IncrementalSortGroupInfo::maxDiskSpaceUsed, IncrementalSortGroupInfo::maxMemorySpaceUsed, SORT_SPACE_TYPE_DISK, SORT_SPACE_TYPE_MEMORY, TuplesortInstrumentation::sortMethod, IncrementalSortGroupInfo::sortMethods, TuplesortInstrumentation::spaceType, TuplesortInstrumentation::spaceUsed, IncrementalSortGroupInfo::totalDiskSpaceUsed, IncrementalSortGroupInfo::totalMemorySpaceUsed, and tuplesort_get_stats().

◆ isCurrentGroup()

static bool isCurrentGroup ( IncrementalSortState node,
TupleTableSlot pivot,
TupleTableSlot tuple 
)
static

Definition at line 212 of file nodeIncrementalSort.c.

213{
214 int nPresortedCols;
215
216 nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols;
217
218 /*
219 * That the input is sorted by keys * (0, ... n) implies that the tail
220 * keys are more likely to change. Therefore we do our comparison starting
221 * from the last pre-sorted column to optimize for early detection of
222 * inequality and minimizing the number of function calls..
223 */
224 for (int i = nPresortedCols - 1; i >= 0; i--)
225 {
226 Datum datumA,
227 datumB,
228 result;
229 bool isnullA,
230 isnullB;
231 AttrNumber attno = node->presorted_keys[i].attno;
233
234 datumA = slot_getattr(pivot, attno, &isnullA);
235 datumB = slot_getattr(tuple, attno, &isnullB);
236
237 /* Special case for NULL-vs-NULL, else use standard comparison */
238 if (isnullA || isnullB)
239 {
240 if (isnullA == isnullB)
241 continue;
242 else
243 return false;
244 }
245
246 key = &node->presorted_keys[i];
247
248 key->fcinfo->args[0].value = datumA;
249 key->fcinfo->args[1].value = datumB;
250
251 /* just for paranoia's sake, we reset isnull each time */
252 key->fcinfo->isnull = false;
253
254 result = FunctionCallInvoke(key->fcinfo);
255
256 /* Check for null result, since caller is clearly not expecting one */
257 if (key->fcinfo->isnull)
258 elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
259
260 if (!DatumGetBool(result))
261 return false;
262 }
263 return true;
264}
int16 AttrNumber
Definition: attnum.h:21
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define FunctionCallInvoke(fcinfo)
Definition: fmgr.h:172
int i
Definition: isn.c:72
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
uintptr_t Datum
Definition: postgres.h:69
OffsetNumber attno
Definition: execnodes.h:2359
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395

References PresortedKeyData::attno, castNode, DatumGetBool(), elog, ERROR, FunctionCallInvoke, i, sort-test::key, PlanState::plan, IncrementalSortState::presorted_keys, ScanState::ps, slot_getattr(), and IncrementalSortState::ss.

Referenced by ExecIncrementalSort(), and switchToPresortedPrefixMode().

◆ preparePresortedCols()

static void preparePresortedCols ( IncrementalSortState node)
static

Definition at line 164 of file nodeIncrementalSort.c.

165{
166 IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
167
168 node->presorted_keys =
170 sizeof(PresortedKeyData));
171
172 /* Pre-cache comparison functions for each pre-sorted key. */
173 for (int i = 0; i < plannode->nPresortedCols; i++)
174 {
175 Oid equalityOp,
176 equalityFunc;
178
179 key = &node->presorted_keys[i];
180 key->attno = plannode->sort.sortColIdx[i];
181
182 equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i],
183 NULL);
184 if (!OidIsValid(equalityOp))
185 elog(ERROR, "missing equality operator for ordering operator %u",
186 plannode->sort.sortOperators[i]);
187
188 equalityFunc = get_opcode(equalityOp);
189 if (!OidIsValid(equalityFunc))
190 elog(ERROR, "missing function for operator %u", equalityOp);
191
192 /* Lookup the comparison function */
193 fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
194
195 /* We can initialize the callinfo just once and re-use it */
196 key->fcinfo = palloc0(SizeForFunctionCallInfo(2));
197 InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2,
198 plannode->sort.collations[i], NULL, NULL);
199 key->fcinfo->args[0].isnull = false;
200 key->fcinfo->args[1].isnull = false;
201 }
202}
#define OidIsValid(objectId)
Definition: c.h:732
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
Definition: fmgr.c:137
#define SizeForFunctionCallInfo(nargs)
Definition: fmgr.h:102
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
Definition: fmgr.h:150
Oid get_equality_op_for_ordering_op(Oid opno, bool *reverse)
Definition: lsyscache.c:267
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1285
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
unsigned int Oid
Definition: postgres_ext.h:32

References castNode, CurrentMemoryContext, elog, ERROR, fmgr_info_cxt(), get_equality_op_for_ordering_op(), get_opcode(), i, InitFunctionCallInfoData, sort-test::key, IncrementalSort::nPresortedCols, OidIsValid, palloc(), palloc0(), PlanState::plan, IncrementalSortState::presorted_keys, ScanState::ps, SizeForFunctionCallInfo, IncrementalSort::sort, and IncrementalSortState::ss.

Referenced by ExecIncrementalSort().

◆ switchToPresortedPrefixMode()

static void switchToPresortedPrefixMode ( PlanState pstate)
static

Definition at line 286 of file nodeIncrementalSort.c.

287{
289 ScanDirection dir;
290 int64 nTuples;
291 TupleDesc tupDesc;
292 PlanState *outerNode;
293 IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan);
294
295 dir = node->ss.ps.state->es_direction;
296 outerNode = outerPlanState(node);
297 tupDesc = ExecGetResultType(outerNode);
298
299 /* Configure the prefix sort state the first time around. */
300 if (node->prefixsort_state == NULL)
301 {
302 Tuplesortstate *prefixsort_state;
303 int nPresortedCols = plannode->nPresortedCols;
304
305 /*
306 * Optimize the sort by assuming the prefix columns are all equal and
307 * thus we only need to sort by any remaining columns.
308 */
309 prefixsort_state = tuplesort_begin_heap(tupDesc,
310 plannode->sort.numCols - nPresortedCols,
311 &(plannode->sort.sortColIdx[nPresortedCols]),
312 &(plannode->sort.sortOperators[nPresortedCols]),
313 &(plannode->sort.collations[nPresortedCols]),
314 &(plannode->sort.nullsFirst[nPresortedCols]),
315 work_mem,
316 NULL,
318 node->prefixsort_state = prefixsort_state;
319 }
320 else
321 {
322 /* Next group of presorted data */
324 }
325
326 /*
327 * If the current node has a bound, then it's reasonably likely that a
328 * large prefix key group will benefit from bounded sort, so configure the
329 * tuplesort to allow for that optimization.
330 */
331 if (node->bounded)
332 {
333 SO1_printf("Setting bound on presorted prefix tuplesort to: " INT64_FORMAT "\n",
334 node->bound - node->bound_Done);
336 node->bound - node->bound_Done);
337 }
338
339 /*
340 * Copy as many tuples as we can (i.e., in the same prefix key group) from
341 * the full sort state to the prefix sort state.
342 */
343 for (nTuples = 0; nTuples < node->n_fullsort_remaining; nTuples++)
344 {
345 /*
346 * When we encounter multiple prefix key groups inside the full sort
347 * tuplesort we have to carry over the last read tuple into the next
348 * batch.
349 */
350 if (nTuples == 0 && !TupIsNull(node->transfer_tuple))
351 {
353 /* The carried over tuple is our new group pivot tuple. */
355 }
356 else
357 {
360 false, node->transfer_tuple, NULL);
361
362 /*
363 * If this is our first time through the loop, then we need to
364 * save the first tuple we get as our new group pivot.
365 */
366 if (TupIsNull(node->group_pivot))
368
369 if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple))
370 {
372 }
373 else
374 {
375 /*
376 * The tuple isn't part of the current batch so we need to
377 * carry it over into the next batch of tuples we transfer out
378 * of the full sort tuplesort into the presorted prefix
379 * tuplesort. We don't actually have to do anything special to
380 * save the tuple since we've already loaded it into the
381 * node->transfer_tuple slot, and, even though that slot
382 * points to memory inside the full sort tuplesort, we can't
383 * reset that tuplesort anyway until we've fully transferred
384 * out its tuples, so this reference is safe. We do need to
385 * reset the group pivot tuple though since we've finished the
386 * current prefix key group.
387 */
389
390 /* Break out of for-loop early */
391 break;
392 }
393 }
394 }
395
396 /*
397 * Track how many tuples remain in the full sort batch so that we know if
398 * we need to sort multiple prefix key groups before processing tuples
399 * remaining in the large single prefix key group we think we've
400 * encountered.
401 */
402 SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);
403 node->n_fullsort_remaining -= nTuples;
404 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT "\n", node->n_fullsort_remaining);
405
406 if (node->n_fullsort_remaining == 0)
407 {
408 /*
409 * We've found that all tuples remaining in the full sort batch are in
410 * the same prefix key group and moved all of those tuples into the
411 * presorted prefix tuplesort. We don't know that we've yet found the
412 * last tuple in the current prefix key group, so save our pivot
413 * comparison tuple and continue fetching tuples from the outer
414 * execution node to load into the presorted prefix tuplesort.
415 */
417 SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
419
420 /*
421 * Make sure we clear the transfer tuple slot so that next time we
422 * encounter a large prefix key group we don't incorrectly assume we
423 * have a tuple carried over from the previous group.
424 */
426 }
427 else
428 {
429 /*
430 * We finished a group but didn't consume all of the tuples from the
431 * full sort state, so we'll sort this batch, let the outer node read
432 * out all of those tuples, and then come back around to find another
433 * batch.
434 */
435 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);
437
438 INSTRUMENT_SORT_GROUP(node, prefixsort);
439
440 if (node->bounded)
441 {
442 /*
443 * If the current node has a bound and we've already sorted n
444 * tuples, then the functional bound remaining is (original bound
445 * - n), so store the current number of processed tuples for use
446 * in configuring sorting bound.
447 */
448 SO2_printf("Changing bound_Done from " INT64_FORMAT " to " INT64_FORMAT "\n",
449 Min(node->bound, node->bound_Done + nTuples), node->bound_Done);
450 node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
451 }
452
453 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
455 }
456}

References IncrementalSortState::bound, IncrementalSortState::bound_Done, IncrementalSortState::bounded, castNode, EState::es_direction, ExecClearTuple(), ExecCopySlot(), ExecGetResultType(), IncrementalSortState::execution_status, IncrementalSortState::fullsort_state, IncrementalSortState::group_pivot, INCSORT_LOADPREFIXSORT, INCSORT_READPREFIXSORT, INSTRUMENT_SORT_GROUP, INT64_FORMAT, isCurrentGroup(), Min, IncrementalSortState::n_fullsort_remaining, IncrementalSort::nPresortedCols, Sort::numCols, outerPlanState, PlanState::plan, IncrementalSortState::prefixsort_state, ScanState::ps, ScanDirectionIsForward, SO1_printf, SO2_printf, SO_printf, IncrementalSort::sort, IncrementalSortState::ss, PlanState::state, IncrementalSortState::transfer_tuple, TupIsNull, TUPLESORT_ALLOWBOUNDED, tuplesort_begin_heap(), tuplesort_gettupleslot(), TUPLESORT_NONE, tuplesort_performsort(), tuplesort_puttupleslot(), tuplesort_reset(), tuplesort_set_bound(), and work_mem.

Referenced by ExecIncrementalSort().