99 #define INSTRUMENT_SORT_GROUP(node, groupName) \
101 if ((node)->ss.ps.instrument != NULL) \
103 if ((node)->shared_info && (node)->am_worker) \
105 Assert(IsParallelWorker()); \
106 Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \
107 instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \
108 (node)->groupName##_state); \
112 instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \
113 (node)->groupName##_state); \
181 key->attno = plannode->
sort.sortColIdx[
i];
186 elog(
ERROR,
"missing equality operator for ordering operator %u",
187 plannode->
sort.sortOperators[
i]);
191 elog(
ERROR,
"missing function for operator %u", equalityOp);
199 plannode->
sort.collations[
i], NULL, NULL);
200 key->fcinfo->args[0].isnull =
false;
201 key->fcinfo->args[1].isnull =
false;
225 for (
int i = nPresortedCols - 1;
i >= 0;
i--)
239 if (isnullA || isnullB)
241 if (isnullA == isnullB)
249 key->fcinfo->args[0].value = datumA;
250 key->fcinfo->args[1].value = datumB;
253 key->fcinfo->isnull =
false;
258 if (
key->fcinfo->isnull)
259 elog(
ERROR,
"function %u returned NULL",
key->flinfo.fn_oid);
312 &(plannode->
sort.sortColIdx[nPresortedCols]),
313 &(plannode->
sort.sortOperators[nPresortedCols]),
314 &(plannode->
sort.collations[nPresortedCols]),
315 &(plannode->
sort.nullsFirst[nPresortedCols]),
418 SO_printf(
"Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");
454 SO_printf(
"Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");
468 #define DEFAULT_MIN_GROUP_SIZE 32
480 #define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)
561 SO1_printf(
"Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",
573 SO_printf(
"Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");
593 if (fullsort_state == NULL)
613 plannode->
sort.sortColIdx,
614 plannode->
sort.sortOperators,
615 plannode->
sort.collations,
616 plannode->
sort.nullsFirst,
670 if (nTuples != minGroupSize)
701 SO_printf(
"Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");
707 if (nTuples < minGroupSize)
723 if (nTuples == minGroupSize)
782 SO_printf(
"Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");
837 nTuples,
Min(currentBound, nTuples));
838 nTuples =
Min(currentBound, nTuples);
841 SO1_printf(
"Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",
936 SO_printf(
"Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");
981 SO_printf(
"ExecInitIncrementalSort: initializing sort node\n");
997 incrsortstate->
bounded =
false;
1068 SO_printf(
"ExecInitIncrementalSort: sort node initialized\n");
1070 return incrsortstate;
1080 SO_printf(
"ExecEndIncrementalSort: shutting down sort node\n");
1109 SO_printf(
"ExecEndIncrementalSort: sort node shutdown\n");
#define OidIsValid(objectId)
elog(ERROR, "%s: %s", p2, msg)
void ExecReScan(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
TupleDesc ExecGetResultType(PlanState *planstate)
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
#define SO2_printf(s, p1, p2)
#define outerPlanState(node)
struct IncrementalSortInfo IncrementalSortInfo
#define EXEC_FLAG_BACKWARD
static TupleTableSlot * ExecProcNode(PlanState *node)
void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)
#define SizeForFunctionCallInfo(nargs)
#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)
#define FunctionCallInvoke(fcinfo)
Assert(fmt[strlen(fmt) - 1] !='\n')
Oid get_equality_op_for_ordering_op(Oid opno, bool *reverse)
RegProcedure get_opcode(Oid opno)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
static void switchToPresortedPrefixMode(PlanState *pstate)
IncrementalSortState * ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
static void instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo, Tuplesortstate *sortState)
void ExecReScanIncrementalSort(IncrementalSortState *node)
void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
void ExecEndIncrementalSort(IncrementalSortState *node)
#define INSTRUMENT_SORT_GROUP(node, groupName)
static TupleTableSlot * ExecIncrementalSort(PlanState *pstate)
static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)
static void preparePresortedCols(IncrementalSortState *node)
void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE
void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
#define DEFAULT_MIN_GROUP_SIZE
#define castNode(_type_, nodeptr)
static bool DatumGetBool(Datum X)
#define ScanDirectionIsForward(direction)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
ScanDirection es_direction
int64 totalMemorySpaceUsed
IncrementalSortGroupInfo prefixsortGroupInfo
IncrementalSortGroupInfo fullsortGroupInfo
Tuplesortstate * prefixsort_state
TupleTableSlot * group_pivot
TupleTableSlot * transfer_tuple
Tuplesortstate * fullsort_state
SharedIncrementalSortInfo * shared_info
IncrementalSortExecutionStatus execution_status
PresortedKeyData * presorted_keys
IncrementalSortInfo incsort_info
int64 n_fullsort_remaining
shm_toc_estimator estimator
Instrumentation * instrument
TupleTableSlot * ps_ResultTupleSlot
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
TupleTableSlot * ss_ScanTupleSlot
TuplesortMethod sortMethod
TuplesortSpaceType spaceType
void tuplesort_performsort(Tuplesortstate *state)
void tuplesort_reset(Tuplesortstate *state)
bool tuplesort_used_bound(Tuplesortstate *state)
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
void tuplesort_end(Tuplesortstate *state)
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
#define TUPLESORT_ALLOWBOUNDED
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, int sortopt)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)