126 #define CLUSTER_SORT 3
129 #define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
130 (state)->worker >= 0 ? 1 : 2)
139 #define INITIAL_MEMTUPSIZE Max(1024, \
140 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
147 #ifdef DEBUG_BOUNDED_SORT
148 bool optimize_bounded_sort =
true;
200 #define SLAB_SLOT_SIZE 1024
236 #define TAPE_BUFFER_OVERHEAD BLCKSZ
237 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
531 #define IS_SLAB_SLOT(state, tuple) \
532 ((char *) (tuple) >= (state)->slabMemoryBegin && \
533 (char *) (tuple) < (state)->slabMemoryEnd)
539 #define RELEASE_SLAB_SLOT(state, tuple) \
541 SlabSlot *buf = (SlabSlot *) tuple; \
543 if (IS_SLAB_SLOT((state), buf)) \
545 buf->nextfree = (state)->slabFreeHead; \
546 (state)->slabFreeHead = buf; \
551 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
552 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
553 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
554 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
555 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
556 #define USEMEM(state,amt) ((state)->availMem -= (amt))
557 #define FREEMEM(state,amt) ((state)->availMem += (amt))
558 #define SERIAL(state) ((state)->shared == NULL)
559 #define WORKER(state) ((state)->shared && (state)->worker != -1)
560 #define LEADER(state) ((state)->shared && (state)->worker == -1)
612 #define LogicalTapeReadExact(tape, ptr, len) \
614 if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
615 elog(ERROR, "unexpected end of data"); \
703 b->datum1,
b->isnull1,
704 &
state->sortKeys[0]);
712 if (
state->onlyKey != NULL)
718 #if SIZEOF_DATUM >= 8
725 compare = ApplySignedSortComparator(
a->datum1,
a->isnull1,
726 b->datum1,
b->isnull1,
727 &
state->sortKeys[0]);
736 if (
state->onlyKey != NULL)
750 b->datum1,
b->isnull1,
751 &
state->sortKeys[0]);
760 if (
state->onlyKey != NULL)
775 #define ST_SORT qsort_tuple_unsigned
776 #define ST_ELEMENT_TYPE SortTuple
777 #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
778 #define ST_COMPARE_ARG_TYPE Tuplesortstate
779 #define ST_CHECK_FOR_INTERRUPTS
780 #define ST_SCOPE static
784 #if SIZEOF_DATUM >= 8
785 #define ST_SORT qsort_tuple_signed
786 #define ST_ELEMENT_TYPE SortTuple
787 #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
788 #define ST_COMPARE_ARG_TYPE Tuplesortstate
789 #define ST_CHECK_FOR_INTERRUPTS
790 #define ST_SCOPE static
795 #define ST_SORT qsort_tuple_int32
796 #define ST_ELEMENT_TYPE SortTuple
797 #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
798 #define ST_COMPARE_ARG_TYPE Tuplesortstate
799 #define ST_CHECK_FOR_INTERRUPTS
800 #define ST_SCOPE static
804 #define ST_SORT qsort_tuple
805 #define ST_ELEMENT_TYPE SortTuple
806 #define ST_COMPARE_RUNTIME_POINTER
807 #define ST_COMPARE_ARG_TYPE Tuplesortstate
808 #define ST_CHECK_FOR_INTERRUPTS
809 #define ST_SCOPE static
814 #define ST_SORT qsort_ssup
815 #define ST_ELEMENT_TYPE SortTuple
816 #define ST_COMPARE(a, b, ssup) \
817 ApplySortComparator((a)->datum1, (a)->isnull1, \
818 (b)->datum1, (b)->isnull1, (ssup))
819 #define ST_COMPARE_ARG_TYPE SortSupportData
820 #define ST_CHECK_FOR_INTERRUPTS
821 #define ST_SCOPE static
854 elog(
ERROR,
"random access disallowed under parallel sort");
890 state->sortopt = sortopt;
891 state->tuples =
true;
899 state->allowedMem =
Max(workMem, 64) * (int64) 1024;
900 state->sortcontext = sortcontext;
901 state->maincontext = maincontext;
908 state->memtuples = NULL;
923 state->shared = NULL;
925 state->nParticipants = -1;
932 state->nParticipants = -1;
986 state->bounded =
false;
987 state->boundUsed =
false;
991 state->tapeset = NULL;
993 state->memtupcount = 0;
999 state->growmemtuples =
true;
1000 state->slabAllocatorUsed =
false;
1004 state->memtuples = NULL;
1007 if (
state->memtuples == NULL)
1015 elog(
ERROR,
"insufficient memory allowed for sort");
1017 state->currentRun = 0;
1024 state->result_tape = NULL;
1032 Oid *sortOperators,
Oid *sortCollations,
1033 bool *nullsFirstFlags,
1048 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
1052 state->nKeys = nkeys;
1065 state->haveDatum1 =
true;
1067 state->tupDesc = tupDesc;
1068 state->abbrevNext = 10;
1073 for (
i = 0;
i < nkeys;
i++)
1096 if (nkeys == 1 && !
state->sortKeys->abbrev_converter)
1123 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
1141 state->abbrevNext = 10;
1149 if (
state->indexInfo->ii_IndexAttrNumbers[0] == 0)
1150 state->haveDatum1 =
false;
1152 state->haveDatum1 =
true;
1154 state->tupDesc = tupDesc;
1158 if (
state->indexInfo->ii_Expressions != NULL)
1201 pfree(indexScanKey);
1212 bool uniqueNullsNotDistinct,
1228 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
1229 enforceUnique ?
't' :
'f',
1246 state->abbrevNext = 10;
1247 state->haveDatum1 =
true;
1249 state->heapRel = heapRel;
1250 state->indexRel = indexRel;
1251 state->enforceUnique = enforceUnique;
1252 state->uniqueNullsNotDistinct = uniqueNullsNotDistinct;
1282 pfree(indexScanKey);
1308 "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1309 "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1323 state->haveDatum1 =
true;
1325 state->heapRel = heapRel;
1326 state->indexRel = indexRel;
1328 state->high_mask = high_mask;
1329 state->low_mask = low_mask;
1330 state->max_buckets = max_buckets;
1354 "begin index sort: workMem = %d, randomAccess = %c",
1364 state->haveDatum1 =
true;
1366 state->heapRel = heapRel;
1367 state->indexRel = indexRel;
1397 bool nullsFirstFlag,
int workMem,
1411 "begin datum sort: workMem = %d, randomAccess = %c",
1428 state->abbrevNext = 10;
1429 state->haveDatum1 =
true;
1431 state->datumType = datumType;
1435 state->datumTypeLen = typlen;
1436 state->tuples = !typbyval;
1442 state->sortKeys->ssup_collation = sortCollation;
1443 state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1453 state->sortKeys->abbreviate = !typbyval;
1463 if (!
state->sortKeys->abbrev_converter)
1499 #ifdef DEBUG_BOUNDED_SORT
1501 if (!optimize_bounded_sort)
1506 if (bound > (int64) (INT_MAX / 2))
1509 state->bounded =
true;
1510 state->bound = (int) bound;
1517 state->sortKeys->abbrev_converter = NULL;
1518 if (
state->sortKeys->abbrev_full_comparator)
1519 state->sortKeys->comparator =
state->sortKeys->abbrev_full_comparator;
1522 state->sortKeys->abbrev_abort = NULL;
1523 state->sortKeys->abbrev_full_comparator = NULL;
1534 return state->boundUsed;
1554 spaceUsed = (
state->allowedMem -
state->availMem + 1023) / 1024;
1574 elog(
LOG,
"%s of worker %d ended, %ld disk blocks used: %s",
1575 SERIAL(
state) ?
"external sort" :
"parallel external sort",
1578 elog(
LOG,
"%s of worker %d ended, %ld KB used: %s",
1579 SERIAL(
state) ?
"internal sort" :
"unperformed parallel sort",
1583 TRACE_POSTGRESQL_SORT_DONE(
state->tapeset != NULL, spaceUsed);
1590 TRACE_POSTGRESQL_SORT_DONE(
state->tapeset != NULL, 0L);
1594 if (
state->estate != NULL)
1658 isSpaceDisk =
false;
1659 spaceUsed =
state->allowedMem -
state->availMem;
1670 if ((isSpaceDisk && !
state->isMaxSpaceDisk) ||
1671 (isSpaceDisk ==
state->isMaxSpaceDisk && spaceUsed >
state->maxSpace))
1673 state->maxSpace = spaceUsed;
1674 state->isMaxSpaceDisk = isSpaceDisk;
1699 state->lastReturnedTuple = NULL;
1700 state->slabMemoryBegin = NULL;
1701 state->slabMemoryEnd = NULL;
1702 state->slabFreeHead = NULL;
1724 int memtupsize =
state->memtupsize;
1725 int64 memNowUsed =
state->allowedMem -
state->availMem;
1728 if (!
state->growmemtuples)
1732 if (memNowUsed <= state->availMem)
1738 if (memtupsize < INT_MAX / 2)
1739 newmemtupsize = memtupsize * 2;
1742 newmemtupsize = INT_MAX;
1743 state->growmemtuples =
false;
1776 grow_ratio = (double)
state->allowedMem / (
double) memNowUsed;
1777 if (memtupsize * grow_ratio < INT_MAX)
1778 newmemtupsize = (int) (memtupsize * grow_ratio);
1780 newmemtupsize = INT_MAX;
1783 state->growmemtuples =
false;
1787 if (newmemtupsize <= memtupsize)
1800 state->growmemtuples =
false;
1814 if (
state->availMem < (int64) ((newmemtupsize - memtupsize) *
sizeof(
SortTuple)))
1819 state->memtupsize = newmemtupsize;
1825 elog(
ERROR,
"unexpected out-of-memory situation in tuplesort");
1830 state->growmemtuples =
false;
1894 tuple->
t_tid = *
self;
1919 stup.
datum1 =
state->sortKeys->abbrev_converter(original,
1938 for (
i = 0;
i <
state->memtupcount;
i++)
1942 tuple = mtup->
tuple;
1978 if (isNull || !
state->tuples)
1998 if (!
state->sortKeys->abbrev_converter)
2005 stup.
datum1 =
state->sortKeys->abbrev_converter(original,
2025 for (
i = 0;
i <
state->memtupcount;
i++)
2047 switch (
state->status)
2058 if (
state->memtupcount >=
state->memtupsize - 1)
2063 state->memtuples[
state->memtupcount++] = *tuple;
2077 if (
state->bounded &&
2083 elog(
LOG,
"switching to bounded heapsort at %d tuples: %s",
2137 state->memtuples[
state->memtupcount++] = *tuple;
2154 Assert(
state->sortKeys[0].abbrev_converter != NULL);
2156 Assert(
state->sortKeys[0].abbrev_full_comparator != NULL);
2165 state->abbrevNext *= 2;
2171 if (!
state->sortKeys->abbrev_abort(
state->memtupcount,
2179 state->sortKeys[0].comparator =
state->sortKeys[0].abbrev_full_comparator;
2180 state->sortKeys[0].abbrev_converter = NULL;
2182 state->sortKeys[0].abbrev_abort = NULL;
2183 state->sortKeys[0].abbrev_full_comparator = NULL;
2202 elog(
LOG,
"performsort of worker %d starting: %s",
2206 switch (
state->status)
2241 state->eof_reached =
false;
2242 state->markpos_block = 0L;
2243 state->markpos_offset = 0;
2244 state->markpos_eof =
false;
2256 state->eof_reached =
false;
2257 state->markpos_offset = 0;
2258 state->markpos_eof =
false;
2272 state->eof_reached =
false;
2273 state->markpos_block = 0L;
2274 state->markpos_offset = 0;
2275 state->markpos_eof =
false;
2287 elog(
LOG,
"performsort of worker %d done (except %d-way final merge): %s",
2291 elog(
LOG,
"performsort of worker %d done: %s",
2310 unsigned int tuplen;
2315 switch (
state->status)
2327 state->eof_reached =
true;
2335 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
2341 if (
state->current <= 0)
2348 if (
state->eof_reached)
2349 state->eof_reached =
false;
2353 if (
state->current <= 0)
2356 *stup =
state->memtuples[
state->current - 1];
2369 if (
state->lastReturnedTuple)
2372 state->lastReturnedTuple = NULL;
2377 if (
state->eof_reached)
2380 if ((tuplen =
getlen(
state->result_tape,
true)) != 0)
2395 state->eof_reached =
true;
2406 if (
state->eof_reached)
2414 2 *
sizeof(
unsigned int));
2417 else if (nmoved != 2 *
sizeof(
unsigned int))
2418 elog(
ERROR,
"unexpected tape position");
2419 state->eof_reached =
false;
2428 sizeof(
unsigned int));
2431 else if (nmoved !=
sizeof(
unsigned int))
2432 elog(
ERROR,
"unexpected tape position");
2439 tuplen + 2 *
sizeof(
unsigned int));
2440 if (nmoved == tuplen +
sizeof(
unsigned int))
2451 else if (nmoved != tuplen + 2 *
sizeof(
unsigned int))
2452 elog(
ERROR,
"bogus tuple length in backward scan");
2464 if (nmoved != tuplen)
2465 elog(
ERROR,
"bogus tuple length in backward scan");
2485 if (
state->lastReturnedTuple)
2488 state->lastReturnedTuple = NULL;
2494 if (
state->memtupcount > 0)
2496 int srcTapeIndex =
state->memtuples[0].srctape;
2500 *stup =
state->memtuples[0];
2519 state->nInputRuns--;
2528 newtup.
srctape = srcTapeIndex;
2575 if (
state->sortKeys->abbrev_converter && abbrev)
2663 if (
state->sortKeys->abbrev_converter && abbrev)
2699 switch (
state->status)
2702 if (
state->memtupcount -
state->current >= ntuples)
2704 state->current += ntuples;
2708 state->eof_reached =
true;
2716 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
2728 while (ntuples-- > 0)
2780 mOrder = allowedMem /
2821 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
2823 nOutputTapes =
Min(nOutputRuns, maxOutputTapes);
2860 elog(
LOG,
"worker %d switching to external sort with %d tapes: %s",
2868 state->shared ? &
state->shared->fileset : NULL,
2871 state->currentRun = 0;
2876 state->inputTapes = NULL;
2877 state->nInputTapes = 0;
2878 state->nInputRuns = 0;
2881 state->nOutputTapes = 0;
2882 state->nOutputRuns = 0;
2942 state->nOutputTapes++;
2943 state->nOutputRuns++;
2952 state->nOutputRuns++;
2968 state->slabMemoryEnd =
state->slabMemoryBegin +
2973 p =
state->slabMemoryBegin;
2974 for (
i = 0;
i < numSlots - 1;
i++)
2983 state->slabMemoryBegin =
state->slabMemoryEnd = NULL;
2984 state->slabFreeHead = NULL;
2986 state->slabAllocatorUsed =
true;
3003 if (
state->sortKeys != NULL &&
state->sortKeys->abbrev_converter != NULL)
3011 state->sortKeys->abbrev_converter = NULL;
3012 state->sortKeys->comparator =
state->sortKeys->abbrev_full_comparator;
3015 state->sortKeys->abbrev_abort = NULL;
3016 state->sortKeys->abbrev_full_comparator = NULL;
3031 state->memtuples = NULL;
3073 elog(
LOG,
"worker %d using %zu KB of memory for tape buffers",
3074 state->worker,
state->tape_buffer_mem / 1024);
3085 if (
state->nInputRuns == 0)
3087 int64 input_buffer_size;
3090 if (
state->nInputTapes > 0)
3092 for (tapenum = 0; tapenum <
state->nInputTapes; tapenum++)
3108 state->nOutputTapes = 0;
3109 state->nOutputRuns = 0;
3122 elog(
LOG,
"starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
3123 state->nInputRuns,
state->nInputTapes, input_buffer_size / 1024,
3128 for (tapenum = 0; tapenum <
state->nInputTapes; tapenum++)
3160 if (
state->nInputRuns == 0 &&
state->nOutputRuns <= 1)
3175 for (tapenum = 0; tapenum <
state->nInputTapes; tapenum++)
3199 while (
state->memtupcount > 0)
3204 srcTapeIndex =
state->memtuples[0].srctape;
3205 srcTape =
state->inputTapes[srcTapeIndex];
3209 if (
state->memtuples[0].tuple)
3224 state->nInputRuns--;
3251 for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
3271 unsigned int tuplen;
3274 if ((tuplen =
getlen(srcTape,
true)) == 0)
3309 if (
state->memtupcount == 0 &&
state->currentRun > 0)
3318 if (
state->currentRun == INT_MAX)
3320 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
3321 errmsg(
"cannot have more than %d runs for an external sort",
3324 if (
state->currentRun > 0)
3327 state->currentRun++;
3331 elog(
LOG,
"worker %d starting quicksort of run %d: %s",
3344 elog(
LOG,
"worker %d finished quicksort of run %d: %s",
3349 memtupwrite =
state->memtupcount;
3350 for (
i = 0;
i < memtupwrite;
i++)
3353 state->memtupcount--;
3369 elog(
LOG,
"worker %d finished writing run %d to tape %d: %s",
3385 switch (
state->status)
3389 state->eof_reached =
false;
3390 state->markpos_offset = 0;
3391 state->markpos_eof =
false;
3395 state->eof_reached =
false;
3396 state->markpos_block = 0L;
3397 state->markpos_offset = 0;
3398 state->markpos_eof =
false;
3418 switch (
state->status)
3426 &
state->markpos_block,
3427 &
state->markpos_offset);
3449 switch (
state->status)
3457 state->markpos_block,
3458 state->markpos_offset);
3490 if (
state->isMaxSpaceDisk)
3496 switch (
state->maxSpaceStatus)
3499 if (
state->boundUsed)
3525 return "still in progress";
3527 return "top-N heapsort";
3531 return "external sort";
3533 return "external merge";
3566 int tupcount =
state->memtupcount;
3577 state->memtupcount = 0;
3578 for (
i = 0;
i < tupcount;
i++)
3615 int tupcount =
state->memtupcount;
3627 while (
state->memtupcount > 1)
3635 state->memtupcount = tupcount;
3644 state->boundUsed =
true;
3657 if (
state->memtupcount > 1)
3667 qsort_tuple_unsigned(
state->memtuples,
3672 #if SIZEOF_DATUM >= 8
3673 else if (
state->sortKeys[0].comparator == ssup_datum_signed_cmp)
3675 qsort_tuple_signed(
state->memtuples,
3683 qsort_tuple_int32(
state->memtuples,
3691 if (
state->onlyKey != NULL)
3693 qsort_ssup(
state->memtuples,
state->memtupcount,
3698 qsort_tuple(
state->memtuples,
3721 memtuples =
state->memtuples;
3730 j =
state->memtupcount++;
3733 int i = (
j - 1) >> 1;
3737 memtuples[
j] = memtuples[
i];
3740 memtuples[
j] = *tuple;
3756 if (--
state->memtupcount <= 0)
3763 tuple = &memtuples[
state->memtupcount];
3790 n =
state->memtupcount;
3794 unsigned int j = 2 *
i + 1;
3803 memtuples[
i] = memtuples[
j];
3806 memtuples[
i] = *tuple;
3820 for (nkey = 0; nkey <
state->nKeys; nkey++, sortKey++)
3840 if (
len == 0 && !eofOK)
3848 unsigned int len = 0;
3876 state->slabFreeHead =
buf->nextfree;
3905 b->datum1,
b->isnull1,
3915 tupDesc =
state->tupDesc;
3921 datum1 =
heap_getattr(<up, attno, tupDesc, &isnull1);
3922 datum2 =
heap_getattr(&rtup, attno, tupDesc, &isnull2);
3932 for (nkey = 1; nkey <
state->nKeys; nkey++, sortKey++)
3936 datum1 =
heap_getattr(<up, attno, tupDesc, &isnull1);
3937 datum2 =
heap_getattr(&rtup, attno, tupDesc, &isnull2);
3964 stup->
tuple = (
void *) tuple;
3970 state->sortKeys[0].ssup_attno,
3976 if (!
state->sortKeys->abbrev_converter || stup->
isnull1)
3991 stup->
datum1 =
state->sortKeys->abbrev_converter(original,
4010 for (
i = 0;
i <
state->memtupcount;
i++)
4020 state->sortKeys[0].ssup_attno,
4037 unsigned int tuplen = tupbodylen +
sizeof(int);
4045 if (!
state->slabAllocatorUsed)
4056 unsigned int tupbodylen =
len -
sizeof(int);
4063 tuple->
t_len = tuplen;
4068 stup->
tuple = (
void *) tuple;
4073 state->sortKeys[0].ssup_attno,
4101 tupDesc =
state->tupDesc;
4104 if (
state->haveDatum1)
4107 b->datum1,
b->isnull1,
4116 datum1 =
heap_getattr(ltup, leading, tupDesc, &isnull1);
4117 datum2 =
heap_getattr(rtup, leading, tupDesc, &isnull2);
4135 if (
state->indexInfo->ii_Expressions == NULL)
4139 for (; nkey <
state->nKeys; nkey++, sortKey++)
4174 l_index_values, l_index_isnull);
4178 r_index_values, r_index_isnull);
4180 for (; nkey <
state->nKeys; nkey++, sortKey++)
4183 l_index_isnull[nkey],
4184 r_index_values[nkey],
4185 r_index_isnull[nkey],
4204 stup->
tuple = (
void *) tuple;
4213 if (!
state->haveDatum1)
4217 state->indexInfo->ii_IndexAttrNumbers[0],
4221 if (!
state->sortKeys->abbrev_converter || stup->
isnull1)
4236 stup->
datum1 =
state->sortKeys->abbrev_converter(original,
4255 for (
i = 0;
i <
state->memtupcount;
i++)
4261 state->indexInfo->ii_IndexAttrNumbers[0],
4282 if (!
state->slabAllocatorUsed)
4299 tuple->
t_len = t_len;
4308 stup->
tuple = (
void *) tuple;
4310 if (
state->haveDatum1)
4312 state->indexInfo->ii_IndexAttrNumbers[0],
4339 bool equal_hasnull =
false;
4350 b->datum1,
b->isnull1,
4358 keysz =
state->nKeys;
4375 equal_hasnull =
true;
4378 for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4391 equal_hasnull =
true;
4404 if (
state->enforceUnique && !(!
state->uniqueNullsNotDistinct && equal_hasnull))
4416 Assert(tuple1 != tuple2);
4423 (
errcode(ERRCODE_UNIQUE_VIOLATION),
4424 errmsg(
"could not create unique index \"%s\"",
4426 key_desc ?
errdetail(
"Key %s is duplicated.", key_desc) :
4443 return (blk1 < blk2) ? -1 : 1;
4450 return (pos1 < pos2) ? -1 : 1;
4480 if (bucket1 > bucket2)
4482 else if (bucket1 < bucket2)
4498 return (blk1 < blk2) ? -1 : 1;
4505 return (pos1 < pos2) ? -1 : 1;
4518 elog(
ERROR,
"copytup_index() should not be called");
4525 unsigned int tuplen;
4534 if (!
state->slabAllocatorUsed)
4545 unsigned int tuplen =
len -
sizeof(
unsigned int);
4552 stup->
tuple = (
void *) tuple;
4570 b->datum1,
b->isnull1,
4577 if (
state->sortKeys->abbrev_converter)
4589 elog(
ERROR,
"copytup_datum() should not be called");
4596 unsigned int tuplen;
4597 unsigned int writtenlen;
4604 else if (!
state->tuples)
4607 tuplen =
sizeof(
Datum);
4611 waddr = stup->
tuple;
4616 writtenlen = tuplen +
sizeof(
unsigned int);
4624 if (!
state->slabAllocatorUsed && stup->
tuple)
4635 unsigned int tuplen =
len -
sizeof(
unsigned int);
4644 else if (!
state->tuples)
4658 stup->
tuple = raddr;
4708 shared->
nTapes = nWorkers;
4709 for (
i = 0;
i < nWorkers;
i++)
4784 state->memtuples = NULL;
4785 state->memtupsize = 0;
4832 int nParticipants =
state->nParticipants;
4833 int workersFinished;
4837 Assert(nParticipants >= 1);
4843 if (nParticipants != workersFinished)
4844 elog(
ERROR,
"cannot take over tapes before all workers finish");
4858 state->currentRun = nParticipants;
4868 state->inputTapes = NULL;
4869 state->nInputTapes = 0;
4870 state->nInputRuns = 0;
4873 state->nOutputTapes = nParticipants;
4874 state->nOutputRuns = nParticipants;
4876 for (
j = 0;
j < nParticipants;
j++)
4909 #if SIZEOF_DATUM >= 8
void PrepareTempTablespaces(void)
static Datum values[MAXATTR]
#define AssertState(condition)
#define offsetof(type, field)
#define FLEXIBLE_ARRAY_MEMBER
#define pg_attribute_always_inline
#define AssertArg(condition)
Datum datumCopy(Datum value, bool typByVal, int typLen)
Size datumGetSize(Datum value, bool typByVal, int typLen)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
const TupleTableSlotOps TTSOpsHeapTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
EState * CreateExecutorState(void)
void FreeExecutorState(EState *estate)
#define ResetPerTupleExprContext(estate)
#define GetPerTupleExprContext(estate)
char * BuildIndexValueDescription(Relation indexRelation, Datum *values, bool *isnull)
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
static int compare(const void *arg1, const void *arg2)
Bucket _hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket, uint32 highmask, uint32 lowmask)
HeapTuple heap_copytuple(HeapTuple tuple)
void heap_free_minimal_tuple(MinimalTuple mtup)
MinimalTuple heap_copy_minimal_tuple(MinimalTuple mtup)
void heap_freetuple(HeapTuple htup)
HeapTupleData * HeapTuple
MinimalTupleData * MinimalTuple
HeapTupleHeaderData * HeapTupleHeader
#define MINIMAL_TUPLE_OFFSET
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
#define MINIMAL_TUPLE_DATA_OFFSET
void FormIndexDatum(IndexInfo *indexInfo, TupleTableSlot *slot, EState *estate, Datum *values, bool *isnull)
IndexInfo * BuildIndexInfo(Relation index)
void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor, Datum *values, bool *isnull)
IndexTuple index_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
#define ItemPointerGetBlockNumber(pointer)
struct ItemPointerData ItemPointerData
#define ItemPointerGetOffsetNumber(pointer)
#define index_getattr(tup, attnum, tupleDesc, isnull)
IndexTupleData * IndexTuple
#define IndexTupleSize(itup)
Assert(fmt[strlen(fmt) - 1] !='\n')
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
void LogicalTapeClose(LogicalTape *lt)
void LogicalTapeSetClose(LogicalTapeSet *lts)
void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
Size GetMemoryChunkSpace(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextDelete(MemoryContext context)
void MemoryContextResetOnly(MemoryContext context)
void * repalloc_huge(void *pointer, Size size)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
#define SK_BT_NULLS_FIRST
BTScanInsert _bt_mkscankey(Relation rel, IndexTuple itup)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
const char * pg_rusage_show(const PGRUsage *ru0)
void pg_rusage_init(PGRUsage *ru0)
static void output(uint64 loop_count)
#define DatumGetPointer(X)
#define PointerGetDatum(X)
#define DatumGetUInt32(X)
#define RelationGetDescr(relation)
#define RelationGetNumberOfAttributes(relation)
#define RelationGetRelationName(relation)
#define IndexRelationGetNumberOfKeyAttributes(relation)
int errtableconstraint(Relation rel, const char *conname)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void PrepareSortSupportFromGistIndexRel(Relation indexRel, SortSupport ssup)
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy, SortSupport ssup)
static int ApplySortAbbrevFullComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
struct SortSupportData * SortSupport
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
#define BTGreaterStrategyNumber
#define BTLessStrategyNumber
ScanKeyData scankeys[INDEX_MAX_KEYS]
TupleTableSlot * ecxt_scantuple
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Datum(* abbrev_converter)(Datum original, SortSupport ssup)
TuplesortMethod sortMethod
TuplesortSpaceType spaceType
SortTupleComparator comparetup
LogicalTape ** inputTapes
MemoryContext maincontext
void(* writetup)(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
void(* copytup)(Tuplesortstate *state, SortTuple *stup, void *tup)
LogicalTape ** outputTapes
bool uniqueNullsNotDistinct
void(* readtup)(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len)
MemoryContext sortcontext
TupSortStatus maxSpaceStatus
LogicalTape * result_tape
MemoryContext tuplecontext
IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward)
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
void tuplesort_rescan(Tuplesortstate *state)
void tuplesort_performsort(Tuplesortstate *state)
int tuplesort_merge_order(int64 allowedMem)
#define TAPE_BUFFER_OVERHEAD
static void tuplesort_heap_delete_top(Tuplesortstate *state)
#define INITIAL_MEMTUPSIZE
static unsigned int getlen(LogicalTape *tape, bool eofOK)
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
#define COMPARETUP(state, a, b)
HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
static void selectnewtape(Tuplesortstate *state)
void tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
Tuplesortstate * tuplesort_begin_index_hash(Relation heapRel, Relation indexRel, uint32 high_mask, uint32 low_mask, uint32 max_buckets, int workMem, SortCoordinate coordinate, int sortopt)
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
void tuplesort_reset(Tuplesortstate *state)
static void markrunend(LogicalTape *tape)
static void readtup_index(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len)
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple)
void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
static bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
static void reversedirection(Tuplesortstate *state)
#define USEMEM(state, amt)
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
static bool grow_memtuples(Tuplesortstate *state)
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Tuplesortstate * tuplesort_begin_index_gist(Relation heapRel, Relation indexRel, int workMem, SortCoordinate coordinate, int sortopt)
static void beginmerge(Tuplesortstate *state)
static void make_bounded_heap(Tuplesortstate *state)
bool tuplesort_used_bound(Tuplesortstate *state)
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
#define WRITETUP(state, tape, stup)
Tuplesortstate * tuplesort_begin_index_btree(Relation heapRel, Relation indexRel, bool enforceUnique, bool uniqueNullsNotDistinct, int workMem, SortCoordinate coordinate, int sortopt)
static void sort_bounded_heap(Tuplesortstate *state)
static int worker_get_identifier(Tuplesortstate *state)
static void mergeonerun(Tuplesortstate *state)
#define FREEMEM(state, amt)
const char * tuplesort_space_type_name(TuplesortSpaceType t)
#define PARALLEL_SORT(state)
static void readtup_datum(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len)
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
static void inittapestate(Tuplesortstate *state, int maxTapes)
static void readtup_heap(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len)
static void writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
static void leader_takeover_tapes(Tuplesortstate *state)
Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, SortCoordinate coordinate, int sortopt)
Size tuplesort_estimate_shared(int nWorkers)
static Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)
static void tuplesort_sort_memtuples(Tuplesortstate *state)
void tuplesort_end(Tuplesortstate *state)
static void inittapes(Tuplesortstate *state, bool mergeruns)
void tuplesort_markpos(Tuplesortstate *state)
#define MERGE_BUFFER_SIZE
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
#define READTUP(state, stup, tape, len)
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
static void tuplesort_updatemax(Tuplesortstate *state)
static void worker_freeze_result_tape(Tuplesortstate *state)
#define RELEASE_SLAB_SLOT(state, tuple)
Tuplesortstate * tuplesort_begin_cluster(TupleDesc tupDesc, Relation indexRel, int workMem, SortCoordinate coordinate, int sortopt)
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
bool tuplesort_getdatum(Tuplesortstate *state, bool forward, Datum *val, bool *isNull, Datum *abbrev)
#define LogicalTapeReadExact(tape, ptr, len)
static void worker_nomergeruns(Tuplesortstate *state)
static void writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
void tuplesort_restorepos(Tuplesortstate *state)
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
static void mergeruns(Tuplesortstate *state)
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len)
int(* SortTupleComparator)(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
static void * readtup_alloc(Tuplesortstate *state, Size tuplen)
void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
static void tuplesort_begin_batch(Tuplesortstate *state)
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
void tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ItemPointer self, Datum *values, bool *isnull)
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
#define COPYTUP(state, stup, tup)
const char * tuplesort_method_name(TuplesortMethod m)
static bool consider_abort_common(Tuplesortstate *state)
static void tuplesort_free(Tuplesortstate *state)
static void dumptuples(Tuplesortstate *state, bool alltuples)
Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, int sortopt)
#define TUPLESORT_RANDOMACCESS
#define TUPLESORT_ALLOWBOUNDED
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
static MinimalTuple ExecCopySlotMinimalTuple(TupleTableSlot *slot)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
char buffer[SLAB_SLOT_SIZE]
union SlabSlot * nextfree