118#define INITIAL_MEMTUPSIZE Max(1024, \
119 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
124#ifdef DEBUG_BOUNDED_SORT
140#define SLAB_SLOT_SIZE 1024
176#define TAPE_BUFFER_OVERHEAD BLCKSZ
177#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
372#define IS_SLAB_SLOT(state, tuple) \
373 ((char *) (tuple) >= (state)->slabMemoryBegin && \
374 (char *) (tuple) < (state)->slabMemoryEnd)
380#define RELEASE_SLAB_SLOT(state, tuple) \
382 SlabSlot *buf = (SlabSlot *) tuple; \
384 if (IS_SLAB_SLOT((state), buf)) \
386 buf->nextfree = (state)->slabFreeHead; \
387 (state)->slabFreeHead = buf; \
392#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
393#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
394#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
395#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
396#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
397#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
398#define USEMEM(state,amt) ((state)->availMem -= (amt))
399#define FREEMEM(state,amt) ((state)->availMem += (amt))
400#define SERIAL(state) ((state)->shared == NULL)
401#define WORKER(state) ((state)->shared && (state)->worker != -1)
402#define LEADER(state) ((state)->shared && (state)->worker == -1)
497 b->datum1,
b->isnull1,
498 &
state->base.sortKeys[0]);
519 b->datum1,
b->isnull1,
520 &
state->base.sortKeys[0]);
542 b->datum1,
b->isnull1,
543 &
state->base.sortKeys[0]);
567#define ST_SORT qsort_tuple_unsigned
568#define ST_ELEMENT_TYPE SortTuple
569#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
570#define ST_COMPARE_ARG_TYPE Tuplesortstate
571#define ST_CHECK_FOR_INTERRUPTS
572#define ST_SCOPE static
576#define ST_SORT qsort_tuple_signed
577#define ST_ELEMENT_TYPE SortTuple
578#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
579#define ST_COMPARE_ARG_TYPE Tuplesortstate
580#define ST_CHECK_FOR_INTERRUPTS
581#define ST_SCOPE static
585#define ST_SORT qsort_tuple_int32
586#define ST_ELEMENT_TYPE SortTuple
587#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
588#define ST_COMPARE_ARG_TYPE Tuplesortstate
589#define ST_CHECK_FOR_INTERRUPTS
590#define ST_SCOPE static
594#define ST_SORT qsort_tuple
595#define ST_ELEMENT_TYPE SortTuple
596#define ST_COMPARE_RUNTIME_POINTER
597#define ST_COMPARE_ARG_TYPE Tuplesortstate
598#define ST_CHECK_FOR_INTERRUPTS
599#define ST_SCOPE static
604#define ST_SORT qsort_ssup
605#define ST_ELEMENT_TYPE SortTuple
606#define ST_COMPARE(a, b, ssup) \
607 ApplySortComparator((a)->datum1, (a)->isnull1, \
608 (b)->datum1, (b)->isnull1, (ssup))
609#define ST_COMPARE_ARG_TYPE SortSupportData
610#define ST_CHECK_FOR_INTERRUPTS
611#define ST_SCOPE static
644 elog(
ERROR,
"random access disallowed under parallel sort");
678 state->base.sortopt = sortopt;
679 state->base.tuples =
true;
680 state->abbrevNext = 10;
689 state->base.sortcontext = sortcontext;
690 state->base.maincontext = maincontext;
710 state->nParticipants = -1;
717 state->nParticipants = -1;
771 state->bounded =
false;
772 state->boundUsed =
false;
778 state->memtupcount = 0;
780 state->growmemtuples =
true;
781 state->slabAllocatorUsed =
false;
796 elog(
ERROR,
"insufficient memory allowed for sort");
798 state->currentRun = 0;
838#ifdef DEBUG_BOUNDED_SORT
848 state->bounded =
true;
856 state->base.sortKeys->abbrev_converter =
NULL;
857 if (
state->base.sortKeys->abbrev_full_comparator)
858 state->base.sortKeys->comparator =
state->base.sortKeys->abbrev_full_comparator;
861 state->base.sortKeys->abbrev_abort =
NULL;
862 state->base.sortKeys->abbrev_full_comparator =
NULL;
873 return state->boundUsed;
891 spaceUsed = (
state->allowedMem -
state->availMem + 1023) / 1024;
906 elog(
LOG,
"%s of worker %d ended, %" PRId64 " disk blocks used: %s",
907 SERIAL(
state) ?
"external sort" :
"parallel external sort",
911 SERIAL(
state) ?
"internal sort" :
"unperformed parallel sort",
975 spaceUsed =
state->allowedMem -
state->availMem;
989 state->maxSpace = spaceUsed;
1040 int memtupsize =
state->memtupsize;
1044 if (!
state->growmemtuples)
1059 state->growmemtuples =
false;
1099 state->growmemtuples =
false;
1116 state->growmemtuples =
false;
1141 elog(
ERROR,
"unexpected out-of-memory situation in tuplesort");
1146 state->growmemtuples =
false;
1163 state->tupleMem += tuplen;
1180 state->base.sortKeys);
1196 switch (
state->status)
1207 if (
state->memtupcount >=
state->memtupsize - 1)
1212 state->memtuples[
state->memtupcount++] = *tuple;
1226 if (
state->bounded &&
1231 elog(
LOG,
"switching to bounded heapsort at %d tuples: %s",
1288 state->memtuples[
state->memtupcount++] = *tuple;
1317 state->abbrevNext *= 2;
1323 if (!
state->base.sortKeys->abbrev_abort(
state->memtupcount,
1324 state->base.sortKeys))
1331 state->base.sortKeys[0].comparator =
state->base.sortKeys[0].abbrev_full_comparator;
1332 state->base.sortKeys[0].abbrev_converter =
NULL;
1334 state->base.sortKeys[0].abbrev_abort =
NULL;
1335 state->base.sortKeys[0].abbrev_full_comparator =
NULL;
1353 elog(
LOG,
"performsort of worker %d starting: %s",
1356 switch (
state->status)
1391 state->eof_reached =
false;
1392 state->markpos_block = 0
L;
1393 state->markpos_offset = 0;
1394 state->markpos_eof =
false;
1407 state->eof_reached =
false;
1408 state->markpos_offset = 0;
1409 state->markpos_eof =
false;
1422 state->eof_reached =
false;
1423 state->markpos_block = 0
L;
1424 state->markpos_offset = 0;
1425 state->markpos_eof =
false;
1436 elog(
LOG,
"performsort of worker %d done (except %d-way final merge): %s",
1440 elog(
LOG,
"performsort of worker %d done: %s",
1458 unsigned int tuplen;
1463 switch (
state->status)
1475 state->eof_reached =
true;
1483 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
1489 if (
state->current <= 0)
1496 if (
state->eof_reached)
1497 state->eof_reached =
false;
1501 if (
state->current <= 0)
1517 if (
state->lastReturnedTuple)
1525 if (
state->eof_reached)
1528 if ((tuplen =
getlen(
state->result_tape,
true)) != 0)
1543 state->eof_reached =
true;
1554 if (
state->eof_reached)
1562 2 *
sizeof(
unsigned int));
1565 else if (
nmoved != 2 *
sizeof(
unsigned int))
1566 elog(
ERROR,
"unexpected tape position");
1567 state->eof_reached =
false;
1576 sizeof(
unsigned int));
1579 else if (
nmoved !=
sizeof(
unsigned int))
1580 elog(
ERROR,
"unexpected tape position");
1587 tuplen + 2 *
sizeof(
unsigned int));
1588 if (
nmoved == tuplen +
sizeof(
unsigned int))
1599 else if (
nmoved != tuplen + 2 *
sizeof(
unsigned int))
1600 elog(
ERROR,
"bogus tuple length in backward scan");
1613 elog(
ERROR,
"bogus tuple length in backward scan");
1633 if (
state->lastReturnedTuple)
1642 if (
state->memtupcount > 0)
1667 state->nInputRuns--;
1707 switch (
state->status)
1710 if (
state->memtupcount -
state->current >= ntuples)
1712 state->current += ntuples;
1716 state->eof_reached =
true;
1724 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
1736 while (ntuples-- > 0)
1829 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1867 elog(
LOG,
"worker %d switching to external sort with %d tapes: %s",
1877 state->currentRun = 0;
1883 state->nInputTapes = 0;
1884 state->nInputRuns = 0;
1887 state->nOutputTapes = 0;
1888 state->nOutputRuns = 0;
1948 state->nOutputTapes++;
1949 state->nOutputRuns++;
1958 state->nOutputRuns++;
1974 state->slabMemoryEnd =
state->slabMemoryBegin +
1979 p =
state->slabMemoryBegin;
1992 state->slabAllocatorUsed =
true;
2017 state->base.sortKeys->abbrev_converter =
NULL;
2018 state->base.sortKeys->comparator =
state->base.sortKeys->abbrev_full_comparator;
2021 state->base.sortKeys->abbrev_abort =
NULL;
2022 state->base.sortKeys->abbrev_full_comparator =
NULL;
2052 if (
state->base.tuples)
2078 elog(
LOG,
"worker %d using %zu KB of memory for tape buffers",
2079 state->worker,
state->tape_buffer_mem / 1024);
2089 if (
state->nInputRuns == 0)
2094 if (
state->nInputTapes > 0)
2112 state->nOutputTapes = 0;
2113 state->nOutputRuns = 0;
2125 elog(
LOG,
"starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2162 if (
state->nInputRuns == 0 &&
state->nOutputRuns <= 1)
2203 while (
state->memtupcount > 0)
2213 if (
state->memtuples[0].tuple)
2228 state->nInputRuns--;
2275 unsigned int tuplen;
2313 if (
state->memtupcount == 0 &&
state->currentRun > 0)
2325 errmsg(
"cannot have more than %d runs for an external sort",
2328 if (
state->currentRun > 0)
2331 state->currentRun++;
2334 elog(
LOG,
"worker %d starting quicksort of run %d: %s",
2345 elog(
LOG,
"worker %d finished quicksort of run %d: %s",
2357 state->memtupcount = 0;
2373 state->tupleMem = 0;
2378 elog(
LOG,
"worker %d finished writing run %d to tape %d: %s",
2393 switch (
state->status)
2397 state->eof_reached =
false;
2398 state->markpos_offset = 0;
2399 state->markpos_eof =
false;
2403 state->eof_reached =
false;
2404 state->markpos_block = 0
L;
2405 state->markpos_offset = 0;
2406 state->markpos_eof =
false;
2426 switch (
state->status)
2434 &
state->markpos_block,
2435 &
state->markpos_offset);
2457 switch (
state->status)
2465 state->markpos_block,
2466 state->markpos_offset);
2498 if (
state->isMaxSpaceDisk)
2504 switch (
state->maxSpaceStatus)
2507 if (
state->boundUsed)
2533 return "still in progress";
2535 return "top-N heapsort";
2539 return "external sort";
2541 return "external merge";
2585 state->memtupcount = 0;
2635 while (
state->memtupcount > 1)
2652 state->boundUsed =
true;
2665 if (
state->memtupcount > 1)
2671 if (
state->base.haveDatum1 &&
state->base.sortKeys)
2700 state->base.onlyKey);
2706 state->base.comparetup,
2727 memtuples =
state->memtuples;
2736 j =
state->memtupcount++;
2739 int i = (
j - 1) >> 1;
2743 memtuples[
j] = memtuples[
i];
2746 memtuples[
j] = *tuple;
2762 if (--
state->memtupcount <= 0)
2769 tuple = &memtuples[
state->memtupcount];
2796 n =
state->memtupcount;
2800 unsigned int j = 2 *
i + 1;
2809 memtuples[
i] = memtuples[
j];
2812 memtuples[
i] = *tuple;
2854 unsigned int len = 0;
2882 state->slabFreeHead =
buf->nextfree;
3008 state->memtupsize = 0;
3055 int nParticipants =
state->nParticipants;
3056 int workersFinished;
3060 Assert(nParticipants >= 1);
3066 if (nParticipants != workersFinished)
3067 elog(
ERROR,
"cannot take over tapes before all workers finish");
3081 state->currentRun = nParticipants;
3092 state->nInputTapes = 0;
3093 state->nInputRuns = 0;
3096 state->nOutputTapes = nParticipants;
3097 state->nOutputRuns = nParticipants;
3099 for (
j = 0;
j < nParticipants;
j++)
void PrepareTempTablespaces(void)
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define pg_attribute_always_inline
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define palloc0_object(type)
static int compare(const void *arg1, const void *arg2)
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
void LogicalTapeClose(LogicalTape *lt)
void LogicalTapeSetClose(LogicalTapeSet *lts)
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
Size GetMemoryChunkSpace(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
void * repalloc_huge(void *pointer, Size size)
void MemoryContextResetOnly(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
const char * pg_rusage_show(const PGRUsage *ru0)
void pg_rusage_init(PGRUsage *ru0)
static char buf[DEFAULT_XLOG_SEG_SIZE]
static int64 DatumGetInt64(Datum X)
static int32 DatumGetInt32(Datum X)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
static int ApplySignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
TuplesortMethod sortMethod
TuplesortSpaceType spaceType
LogicalTape ** inputTapes
LogicalTape ** outputTapes
TupSortStatus maxSpaceStatus
LogicalTape * result_tape
void tuplesort_rescan(Tuplesortstate *state)
void tuplesort_performsort(Tuplesortstate *state)
int tuplesort_merge_order(int64 allowedMem)
#define TAPE_BUFFER_OVERHEAD
static void tuplesort_heap_delete_top(Tuplesortstate *state)
#define INITIAL_MEMTUPSIZE
static unsigned int getlen(LogicalTape *tape, bool eofOK)
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
#define COMPARETUP(state, a, b)
static void selectnewtape(Tuplesortstate *state)
void tuplesort_reset(Tuplesortstate *state)
static void markrunend(LogicalTape *tape)
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
#define REMOVEABBREV(state, stup, count)
static void reversedirection(Tuplesortstate *state)
#define USEMEM(state, amt)
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
static bool grow_memtuples(Tuplesortstate *state)
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
static void beginmerge(Tuplesortstate *state)
static void make_bounded_heap(Tuplesortstate *state)
bool tuplesort_used_bound(Tuplesortstate *state)
#define WRITETUP(state, tape, stup)
static void sort_bounded_heap(Tuplesortstate *state)
static int worker_get_identifier(Tuplesortstate *state)
static void mergeonerun(Tuplesortstate *state)
#define FREEMEM(state, amt)
static void inittapestate(Tuplesortstate *state, int maxTapes)
static void leader_takeover_tapes(Tuplesortstate *state)
Size tuplesort_estimate_shared(int nWorkers)
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
static void tuplesort_sort_memtuples(Tuplesortstate *state)
void tuplesort_end(Tuplesortstate *state)
static void inittapes(Tuplesortstate *state, bool mergeruns)
void tuplesort_markpos(Tuplesortstate *state)
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
const char * tuplesort_space_type_name(TuplesortSpaceType t)
#define MERGE_BUFFER_SIZE
#define READTUP(state, stup, tape, len)
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
static void tuplesort_updatemax(Tuplesortstate *state)
static void worker_freeze_result_tape(Tuplesortstate *state)
static pg_attribute_always_inline int qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
#define RELEASE_SLAB_SLOT(state, tuple)
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
static void worker_nomergeruns(Tuplesortstate *state)
const char * tuplesort_method_name(TuplesortMethod m)
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
void tuplesort_restorepos(Tuplesortstate *state)
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
static void mergeruns(Tuplesortstate *state)
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
static void tuplesort_begin_batch(Tuplesortstate *state)
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
static bool consider_abort_common(Tuplesortstate *state)
static void tuplesort_free(Tuplesortstate *state)
static void dumptuples(Tuplesortstate *state, bool alltuples)
#define TupleSortUseBumpTupleCxt(opt)
#define TUPLESORT_RANDOMACCESS
#define TUPLESORT_ALLOWBOUNDED
char buffer[SLAB_SLOT_SIZE]
union SlabSlot * nextfree