119#define INITIAL_MEMTUPSIZE Max(1024, \
120 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
125#ifdef DEBUG_BOUNDED_SORT
141#define SLAB_SLOT_SIZE 1024
177#define TAPE_BUFFER_OVERHEAD BLCKSZ
178#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
373#define IS_SLAB_SLOT(state, tuple) \
374 ((char *) (tuple) >= (state)->slabMemoryBegin && \
375 (char *) (tuple) < (state)->slabMemoryEnd)
381#define RELEASE_SLAB_SLOT(state, tuple) \
383 SlabSlot *buf = (SlabSlot *) tuple; \
385 if (IS_SLAB_SLOT((state), buf)) \
387 buf->nextfree = (state)->slabFreeHead; \
388 (state)->slabFreeHead = buf; \
393#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
394#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
395#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
396#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
397#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
398#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
399#define USEMEM(state,amt) ((state)->availMem -= (amt))
400#define FREEMEM(state,amt) ((state)->availMem += (amt))
401#define SERIAL(state) ((state)->shared == NULL)
402#define WORKER(state) ((state)->shared && (state)->worker != -1)
403#define LEADER(state) ((state)->shared && (state)->worker == -1)
489#define ST_SORT qsort_tuple
490#define ST_ELEMENT_TYPE SortTuple
491#define ST_COMPARE_RUNTIME_POINTER
492#define ST_COMPARE_ARG_TYPE Tuplesortstate
493#define ST_CHECK_FOR_INTERRUPTS
494#define ST_SCOPE static
499#define ST_SORT qsort_ssup
500#define ST_ELEMENT_TYPE SortTuple
501#define ST_COMPARE(a, b, ssup) \
502 ApplySortComparator((a)->datum1, (a)->isnull1, \
503 (b)->datum1, (b)->isnull1, (ssup))
504#define ST_COMPARE_ARG_TYPE SortSupportData
505#define ST_CHECK_FOR_INTERRUPTS
506#define ST_SCOPE static
524#define QSORT_THRESHOLD 40
556 elog(
ERROR,
"random access disallowed under parallel sort");
590 state->base.sortopt = sortopt;
591 state->base.tuples =
true;
592 state->abbrevNext = 10;
601 state->base.sortcontext = sortcontext;
602 state->base.maincontext = maincontext;
622 state->nParticipants = -1;
629 state->nParticipants = -1;
683 state->bounded =
false;
684 state->boundUsed =
false;
690 state->memtupcount = 0;
692 state->growmemtuples =
true;
693 state->slabAllocatorUsed =
false;
708 elog(
ERROR,
"insufficient memory allowed for sort");
710 state->currentRun = 0;
750#ifdef DEBUG_BOUNDED_SORT
760 state->bounded =
true;
768 state->base.sortKeys->abbrev_converter =
NULL;
769 if (
state->base.sortKeys->abbrev_full_comparator)
770 state->base.sortKeys->comparator =
state->base.sortKeys->abbrev_full_comparator;
773 state->base.sortKeys->abbrev_abort =
NULL;
774 state->base.sortKeys->abbrev_full_comparator =
NULL;
785 return state->boundUsed;
803 spaceUsed = (
state->allowedMem -
state->availMem + 1023) / 1024;
818 elog(
LOG,
"%s of worker %d ended, %" PRId64 " disk blocks used: %s",
819 SERIAL(
state) ?
"external sort" :
"parallel external sort",
823 SERIAL(
state) ?
"internal sort" :
"unperformed parallel sort",
887 spaceUsed =
state->allowedMem -
state->availMem;
901 state->maxSpace = spaceUsed;
952 int memtupsize =
state->memtupsize;
956 if (!
state->growmemtuples)
971 state->growmemtuples =
false;
1011 state->growmemtuples =
false;
1028 state->growmemtuples =
false;
1053 elog(
ERROR,
"unexpected out-of-memory situation in tuplesort");
1058 state->growmemtuples =
false;
1075 state->tupleMem += tuplen;
1092 state->base.sortKeys);
1108 switch (
state->status)
1119 if (
state->memtupcount >=
state->memtupsize - 1)
1124 state->memtuples[
state->memtupcount++] = *tuple;
1138 if (
state->bounded &&
1143 elog(
LOG,
"switching to bounded heapsort at %d tuples: %s",
1200 state->memtuples[
state->memtupcount++] = *tuple;
1229 state->abbrevNext *= 2;
1235 if (!
state->base.sortKeys->abbrev_abort(
state->memtupcount,
1236 state->base.sortKeys))
1243 state->base.sortKeys[0].comparator =
state->base.sortKeys[0].abbrev_full_comparator;
1244 state->base.sortKeys[0].abbrev_converter =
NULL;
1246 state->base.sortKeys[0].abbrev_abort =
NULL;
1247 state->base.sortKeys[0].abbrev_full_comparator =
NULL;
1265 elog(
LOG,
"performsort of worker %d starting: %s",
1268 switch (
state->status)
1303 state->eof_reached =
false;
1304 state->markpos_block = 0
L;
1305 state->markpos_offset = 0;
1306 state->markpos_eof =
false;
1319 state->eof_reached =
false;
1320 state->markpos_offset = 0;
1321 state->markpos_eof =
false;
1334 state->eof_reached =
false;
1335 state->markpos_block = 0
L;
1336 state->markpos_offset = 0;
1337 state->markpos_eof =
false;
1348 elog(
LOG,
"performsort of worker %d done (except %d-way final merge): %s",
1352 elog(
LOG,
"performsort of worker %d done: %s",
1370 unsigned int tuplen;
1375 switch (
state->status)
1387 state->eof_reached =
true;
1395 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
1401 if (
state->current <= 0)
1408 if (
state->eof_reached)
1409 state->eof_reached =
false;
1413 if (
state->current <= 0)
1429 if (
state->lastReturnedTuple)
1437 if (
state->eof_reached)
1440 if ((tuplen =
getlen(
state->result_tape,
true)) != 0)
1455 state->eof_reached =
true;
1466 if (
state->eof_reached)
1474 2 *
sizeof(
unsigned int));
1477 else if (
nmoved != 2 *
sizeof(
unsigned int))
1478 elog(
ERROR,
"unexpected tape position");
1479 state->eof_reached =
false;
1488 sizeof(
unsigned int));
1491 else if (
nmoved !=
sizeof(
unsigned int))
1492 elog(
ERROR,
"unexpected tape position");
1499 tuplen + 2 *
sizeof(
unsigned int));
1500 if (
nmoved == tuplen +
sizeof(
unsigned int))
1511 else if (
nmoved != tuplen + 2 *
sizeof(
unsigned int))
1512 elog(
ERROR,
"bogus tuple length in backward scan");
1525 elog(
ERROR,
"bogus tuple length in backward scan");
1545 if (
state->lastReturnedTuple)
1554 if (
state->memtupcount > 0)
1579 state->nInputRuns--;
1619 switch (
state->status)
1622 if (
state->memtupcount -
state->current >= ntuples)
1624 state->current += ntuples;
1628 state->eof_reached =
true;
1636 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
1648 while (ntuples-- > 0)
1741 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1779 elog(
LOG,
"worker %d switching to external sort with %d tapes: %s",
1789 state->currentRun = 0;
1795 state->nInputTapes = 0;
1796 state->nInputRuns = 0;
1799 state->nOutputTapes = 0;
1800 state->nOutputRuns = 0;
1860 state->nOutputTapes++;
1861 state->nOutputRuns++;
1870 state->nOutputRuns++;
1886 state->slabMemoryEnd =
state->slabMemoryBegin +
1891 p =
state->slabMemoryBegin;
1904 state->slabAllocatorUsed =
true;
1929 state->base.sortKeys->abbrev_converter =
NULL;
1930 state->base.sortKeys->comparator =
state->base.sortKeys->abbrev_full_comparator;
1933 state->base.sortKeys->abbrev_abort =
NULL;
1934 state->base.sortKeys->abbrev_full_comparator =
NULL;
1964 if (
state->base.tuples)
1990 elog(
LOG,
"worker %d using %zu KB of memory for tape buffers",
1991 state->worker,
state->tape_buffer_mem / 1024);
2001 if (
state->nInputRuns == 0)
2006 if (
state->nInputTapes > 0)
2024 state->nOutputTapes = 0;
2025 state->nOutputRuns = 0;
2037 elog(
LOG,
"starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2074 if (
state->nInputRuns == 0 &&
state->nOutputRuns <= 1)
2115 while (
state->memtupcount > 0)
2125 if (
state->memtuples[0].tuple)
2140 state->nInputRuns--;
2187 unsigned int tuplen;
2225 if (
state->memtupcount == 0 &&
state->currentRun > 0)
2237 errmsg(
"cannot have more than %d runs for an external sort",
2240 if (
state->currentRun > 0)
2243 state->currentRun++;
2246 elog(
LOG,
"worker %d starting quicksort of run %d: %s",
2257 elog(
LOG,
"worker %d finished quicksort of run %d: %s",
2269 state->memtupcount = 0;
2285 state->tupleMem = 0;
2290 elog(
LOG,
"worker %d finished writing run %d to tape %d: %s",
2305 switch (
state->status)
2309 state->eof_reached =
false;
2310 state->markpos_offset = 0;
2311 state->markpos_eof =
false;
2315 state->eof_reached =
false;
2316 state->markpos_block = 0
L;
2317 state->markpos_offset = 0;
2318 state->markpos_eof =
false;
2338 switch (
state->status)
2346 &
state->markpos_block,
2347 &
state->markpos_offset);
2369 switch (
state->status)
2377 state->markpos_block,
2378 state->markpos_offset);
2410 if (
state->isMaxSpaceDisk)
2416 switch (
state->maxSpaceStatus)
2419 if (
state->boundUsed)
2445 return "still in progress";
2447 return "top-N heapsort";
2451 return "external sort";
2453 return "external merge";
2497 state->memtupcount = 0;
2547 while (
state->memtupcount > 1)
2564 state->boundUsed =
true;
2578 return (key >> shift) & 0xFF;
2660 int num_partitions = 0;
2692 for (
int i = 0;
i < 256;
i++)
2732 for (
int i = 0;
i < num_partitions;
i++)
2740 size_t offset =
partitions[st->curbyte].offset++;
2746 *st = begin[offset];
2747 begin[offset] = tmp;
2760 if (num_partitions == 1)
2802 state->base.comparetup,
2822 state->base.comparetup_tiebreak,
2841 bool nulls_first =
state->base.sortKeys[0].ssup_nulls_first;
2854 while (d1 < n &&
data[d1].isnull1 == nulls_first)
2881 i += (
data[
i].isnull1 == nulls_first);
2891 i += (
data[
i].isnull1 == nulls_first);
2918 Assert(st->isnull1 ==
true);
2922 Assert(st->isnull1 ==
false);
2931 state->base.comparetup_tiebreak,
2943 state->base.comparetup,
2979#ifdef USE_ASSERT_CHECKING
2998 if (
state->memtupcount > 1)
3003 if (
state->base.haveDatum1 &&
state->base.sortKeys)
3025 state->base.onlyKey);
3031 state->base.comparetup,
3052 memtuples =
state->memtuples;
3061 j =
state->memtupcount++;
3064 int i = (
j - 1) >> 1;
3068 memtuples[
j] = memtuples[
i];
3071 memtuples[
j] = *tuple;
3087 if (--
state->memtupcount <= 0)
3094 tuple = &memtuples[
state->memtupcount];
3121 n =
state->memtupcount;
3125 unsigned int j = 2 *
i + 1;
3134 memtuples[
i] = memtuples[
j];
3137 memtuples[
i] = *tuple;
3179 unsigned int len = 0;
3207 state->slabFreeHead =
buf->nextfree;
3333 state->memtupsize = 0;
3380 int nParticipants =
state->nParticipants;
3381 int workersFinished;
3385 Assert(nParticipants >= 1);
3391 if (nParticipants != workersFinished)
3392 elog(
ERROR,
"cannot take over tapes before all workers finish");
3406 state->currentRun = nParticipants;
3417 state->nInputTapes = 0;
3418 state->nInputRuns = 0;
3421 state->nOutputTapes = nParticipants;
3422 state->nOutputRuns = nParticipants;
3424 for (
j = 0;
j < nParticipants;
j++)
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
int errcode(int sqlerrcode)
#define ereport(elevel,...)
#define palloc0_object(type)
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
void LogicalTapeClose(LogicalTape *lt)
void LogicalTapeSetClose(LogicalTapeSet *lts)
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
Size GetMemoryChunkSpace(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
void * repalloc_huge(void *pointer, Size size)
void MemoryContextResetOnly(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int pg_leftmost_one_pos64(uint64 word)
const char * pg_rusage_show(const PGRUsage *ru0)
void pg_rusage_init(PGRUsage *ru0)
static char buf[DEFAULT_XLOG_SEG_SIZE]
static uint32 DatumGetUInt32(Datum X)
static uint64 DatumGetUInt64(Datum X)
static int64 DatumGetInt64(Datum X)
static Datum UInt32GetDatum(uint32 X)
static int32 DatumGetInt32(Datum X)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
int(* comparator)(Datum x, Datum y, SortSupport ssup)
TuplesortMethod sortMethod
TuplesortSpaceType spaceType
LogicalTape ** inputTapes
LogicalTape ** outputTapes
TupSortStatus maxSpaceStatus
LogicalTape * result_tape
void tuplesort_rescan(Tuplesortstate *state)
void tuplesort_performsort(Tuplesortstate *state)
int tuplesort_merge_order(int64 allowedMem)
#define TAPE_BUFFER_OVERHEAD
static void tuplesort_heap_delete_top(Tuplesortstate *state)
#define INITIAL_MEMTUPSIZE
static unsigned int getlen(LogicalTape *tape, bool eofOK)
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
#define COMPARETUP(state, a, b)
static Datum normalize_datum(Datum orig, SortSupport ssup)
static void selectnewtape(Tuplesortstate *state)
void tuplesort_reset(Tuplesortstate *state)
static void markrunend(LogicalTape *tape)
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
static uint8 current_byte(Datum key, int level)
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
#define REMOVEABBREV(state, stup, count)
static void reversedirection(Tuplesortstate *state)
#define USEMEM(state, amt)
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
static bool grow_memtuples(Tuplesortstate *state)
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
static void beginmerge(Tuplesortstate *state)
static void make_bounded_heap(Tuplesortstate *state)
bool tuplesort_used_bound(Tuplesortstate *state)
#define WRITETUP(state, tape, stup)
static void sort_bounded_heap(Tuplesortstate *state)
static int worker_get_identifier(Tuplesortstate *state)
static void mergeonerun(Tuplesortstate *state)
#define FREEMEM(state, amt)
static void radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
static void verify_memtuples_sorted(Tuplesortstate *state)
static void inittapestate(Tuplesortstate *state, int maxTapes)
static void leader_takeover_tapes(Tuplesortstate *state)
Size tuplesort_estimate_shared(int nWorkers)
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
static void tuplesort_sort_memtuples(Tuplesortstate *state)
void tuplesort_end(Tuplesortstate *state)
static void inittapes(Tuplesortstate *state, bool mergeruns)
void tuplesort_markpos(Tuplesortstate *state)
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
const char * tuplesort_space_type_name(TuplesortSpaceType t)
static void radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
#define MERGE_BUFFER_SIZE
#define READTUP(state, stup, tape, len)
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
static void tuplesort_updatemax(Tuplesortstate *state)
static void worker_freeze_result_tape(Tuplesortstate *state)
#define RELEASE_SLAB_SLOT(state, tuple)
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
static void worker_nomergeruns(Tuplesortstate *state)
const char * tuplesort_method_name(TuplesortMethod m)
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
void tuplesort_restorepos(Tuplesortstate *state)
static void mergeruns(Tuplesortstate *state)
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
static void tuplesort_begin_batch(Tuplesortstate *state)
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
static bool consider_abort_common(Tuplesortstate *state)
static void tuplesort_free(Tuplesortstate *state)
static void dumptuples(Tuplesortstate *state, bool alltuples)
#define TupleSortUseBumpTupleCxt(opt)
#define TUPLESORT_RANDOMACCESS
#define TUPLESORT_ALLOWBOUNDED
char buffer[SLAB_SLOT_SIZE]
union SlabSlot * nextfree