98 #define INSTRUMENT_SORT_GROUP(node, groupName) \
100 if ((node)->ss.ps.instrument != NULL) \
102 if ((node)->shared_info && (node)->am_worker) \
104 Assert(IsParallelWorker()); \
105 Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
106 instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
107 (node)->groupName##_state); \
111 instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
112 (node)->groupName##_state); \
180 key->attno = plannode->
sort.sortColIdx[
i];
185 elog(
ERROR,
"missing equality operator for ordering operator %u",
186 plannode->
sort.sortOperators[
i]);
190 elog(
ERROR,
"missing function for operator %u", equalityOp);
198 plannode->
sort.collations[
i], NULL, NULL);
199 key->fcinfo->args[0].isnull =
false;
200 key->fcinfo->args[1].isnull =
false;
224 for (
int i = nPresortedCols - 1;
i >= 0;
i--)
238 if (isnullA || isnullB)
240 if (isnullA == isnullB)
248 key->fcinfo->args[0].value = datumA;
249 key->fcinfo->args[1].value = datumB;
252 key->fcinfo->isnull =
false;
257 if (
key->fcinfo->isnull)
258 elog(
ERROR,
"function %u returned NULL",
key->flinfo.fn_oid);
311 &(plannode->
sort.sortColIdx[nPresortedCols]),
312 &(plannode->
sort.sortOperators[nPresortedCols]),
313 &(plannode->
sort.collations[nPresortedCols]),
314 &(plannode->
sort.nullsFirst[nPresortedCols]),
417 SO_printf(
"Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
453 SO_printf(
"Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
467 #define DEFAULT_MIN_GROUP_SIZE 32
479 #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)
560 SO1_printf(
"Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
572 SO_printf(
"Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
592 if (fullsort_state == NULL)
612 plannode->
sort.sortColIdx,
613 plannode->
sort.sortOperators,
614 plannode->
sort.collations,
615 plannode->
sort.nullsFirst,
669 if (nTuples != minGroupSize)
700 SO_printf(
"Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
706 if (nTuples < minGroupSize)
722 if (nTuples == minGroupSize)
781 SO_printf(
"Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
836 nTuples,
Min(currentBound, nTuples));
837 nTuples =
Min(currentBound, nTuples);
840 SO1_printf(
"Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
935 SO_printf(
"Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
980 SO_printf(
"ExecInitIncrementalSort: initializing sort node\n");
996 incrsortstate->
bounded =
false;
1067 SO_printf(
"ExecInitIncrementalSort: sort node initialized\n");
1069 return incrsortstate;
1079 SO_printf(
"ExecEndIncrementalSort: shutting down sort node\n");
1103 SO_printf(
"ExecEndIncrementalSort: sort node shutdown\n");
#define Assert(condition)
#define OidIsValid(objectId)
void ExecReScan(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
TupleDesc ExecGetResultType(PlanState *planstate)
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
#define SO2_printf(s, p1, p2)
#define outerPlanState(node)
struct IncrementalSortInfo IncrementalSortInfo
#define EXEC_FLAG_BACKWARD
static TupleTableSlot * ExecProcNode(PlanState *node)
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
#define SizeForFunctionCallInfo(nargs)
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
#define FunctionCallInvoke(fcinfo)
Oid get_equality_op_for_ordering_op(Oid opno, bool *reverse)
RegProcedure get_opcode(Oid opno)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
static void switchToPresortedPrefixMode(PlanState *pstate)
IncrementalSortState * ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
static void instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo, Tuplesortstate *sortState)
void ExecReScanIncrementalSort(IncrementalSortState *node)
void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
void ExecEndIncrementalSort(IncrementalSortState *node)
#define INSTRUMENT_SORT_GROUP(node, groupName)
static TupleTableSlot * ExecIncrementalSort(PlanState *pstate)
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
static void preparePresortedCols(IncrementalSortState *node)
void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE
void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
#define DEFAULT_MIN_GROUP_SIZE
#define castNode(_type_, nodeptr)
static bool DatumGetBool(Datum X)
#define ScanDirectionIsForward(direction)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
static pg_noinline void Size size
ScanDirection es_direction
int64 totalMemorySpaceUsed
IncrementalSortGroupInfo prefixsortGroupInfo
IncrementalSortGroupInfo fullsortGroupInfo
Tuplesortstate * prefixsort_state
TupleTableSlot * group_pivot
TupleTableSlot * transfer_tuple
Tuplesortstate * fullsort_state
SharedIncrementalSortInfo * shared_info
IncrementalSortExecutionStatus execution_status
PresortedKeyData * presorted_keys
IncrementalSortInfo incsort_info
int64 n_fullsort_remaining
shm_toc_estimator estimator
Instrumentation * instrument
TupleTableSlot * ps_ResultTupleSlot
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
TuplesortMethod sortMethod
TuplesortSpaceType spaceType
void tuplesort_performsort(Tuplesortstate *state)
void tuplesort_reset(Tuplesortstate *state)
bool tuplesort_used_bound(Tuplesortstate *state)
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
void tuplesort_end(Tuplesortstate *state)
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
#define TUPLESORT_ALLOWBOUNDED
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, int sortopt)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)