118#define INITIAL_MEMTUPSIZE Max(1024, \
119 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
124#ifdef DEBUG_BOUNDED_SORT
140#define SLAB_SLOT_SIZE 1024
176#define TAPE_BUFFER_OVERHEAD BLCKSZ
177#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
372#define IS_SLAB_SLOT(state, tuple) \
373 ((char *) (tuple) >= (state)->slabMemoryBegin && \
374 (char *) (tuple) < (state)->slabMemoryEnd)
380#define RELEASE_SLAB_SLOT(state, tuple) \
382 SlabSlot *buf = (SlabSlot *) tuple; \
384 if (IS_SLAB_SLOT((state), buf)) \
386 buf->nextfree = (state)->slabFreeHead; \
387 (state)->slabFreeHead = buf; \
392#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
393#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
394#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
395#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
396#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
397#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
398#define USEMEM(state,amt) ((state)->availMem -= (amt))
399#define FREEMEM(state,amt) ((state)->availMem += (amt))
400#define SERIAL(state) ((state)->shared == NULL)
401#define WORKER(state) ((state)->shared && (state)->worker != -1)
402#define LEADER(state) ((state)->shared && (state)->worker == -1)
488#define ST_SORT qsort_tuple
489#define ST_ELEMENT_TYPE SortTuple
490#define ST_COMPARE_RUNTIME_POINTER
491#define ST_COMPARE_ARG_TYPE Tuplesortstate
492#define ST_CHECK_FOR_INTERRUPTS
493#define ST_SCOPE static
498#define ST_SORT qsort_ssup
499#define ST_ELEMENT_TYPE SortTuple
500#define ST_COMPARE(a, b, ssup) \
501 ApplySortComparator((a)->datum1, (a)->isnull1, \
502 (b)->datum1, (b)->isnull1, (ssup))
503#define ST_COMPARE_ARG_TYPE SortSupportData
504#define ST_CHECK_FOR_INTERRUPTS
505#define ST_SCOPE static
523#define QSORT_THRESHOLD 40
555 elog(
ERROR,
"random access disallowed under parallel sort");
589 state->base.sortopt = sortopt;
590 state->base.tuples =
true;
591 state->abbrevNext = 10;
600 state->base.sortcontext = sortcontext;
601 state->base.maincontext = maincontext;
621 state->nParticipants = -1;
628 state->nParticipants = -1;
682 state->bounded =
false;
683 state->boundUsed =
false;
689 state->memtupcount = 0;
691 state->growmemtuples =
true;
692 state->slabAllocatorUsed =
false;
707 elog(
ERROR,
"insufficient memory allowed for sort");
709 state->currentRun = 0;
749#ifdef DEBUG_BOUNDED_SORT
759 state->bounded =
true;
767 state->base.sortKeys->abbrev_converter =
NULL;
768 if (
state->base.sortKeys->abbrev_full_comparator)
769 state->base.sortKeys->comparator =
state->base.sortKeys->abbrev_full_comparator;
772 state->base.sortKeys->abbrev_abort =
NULL;
773 state->base.sortKeys->abbrev_full_comparator =
NULL;
784 return state->boundUsed;
802 spaceUsed = (
state->allowedMem -
state->availMem + 1023) / 1024;
817 elog(
LOG,
"%s of worker %d ended, %" PRId64 " disk blocks used: %s",
818 SERIAL(
state) ?
"external sort" :
"parallel external sort",
822 SERIAL(
state) ?
"internal sort" :
"unperformed parallel sort",
886 spaceUsed =
state->allowedMem -
state->availMem;
900 state->maxSpace = spaceUsed;
951 int memtupsize =
state->memtupsize;
955 if (!
state->growmemtuples)
970 state->growmemtuples =
false;
1010 state->growmemtuples =
false;
1027 state->growmemtuples =
false;
1052 elog(
ERROR,
"unexpected out-of-memory situation in tuplesort");
1057 state->growmemtuples =
false;
1074 state->tupleMem += tuplen;
1091 state->base.sortKeys);
1107 switch (
state->status)
1118 if (
state->memtupcount >=
state->memtupsize - 1)
1123 state->memtuples[
state->memtupcount++] = *tuple;
1137 if (
state->bounded &&
1142 elog(
LOG,
"switching to bounded heapsort at %d tuples: %s",
1199 state->memtuples[
state->memtupcount++] = *tuple;
1228 state->abbrevNext *= 2;
1234 if (!
state->base.sortKeys->abbrev_abort(
state->memtupcount,
1235 state->base.sortKeys))
1242 state->base.sortKeys[0].comparator =
state->base.sortKeys[0].abbrev_full_comparator;
1243 state->base.sortKeys[0].abbrev_converter =
NULL;
1245 state->base.sortKeys[0].abbrev_abort =
NULL;
1246 state->base.sortKeys[0].abbrev_full_comparator =
NULL;
1264 elog(
LOG,
"performsort of worker %d starting: %s",
1267 switch (
state->status)
1302 state->eof_reached =
false;
1303 state->markpos_block = 0
L;
1304 state->markpos_offset = 0;
1305 state->markpos_eof =
false;
1318 state->eof_reached =
false;
1319 state->markpos_offset = 0;
1320 state->markpos_eof =
false;
1333 state->eof_reached =
false;
1334 state->markpos_block = 0
L;
1335 state->markpos_offset = 0;
1336 state->markpos_eof =
false;
1347 elog(
LOG,
"performsort of worker %d done (except %d-way final merge): %s",
1351 elog(
LOG,
"performsort of worker %d done: %s",
1369 unsigned int tuplen;
1374 switch (
state->status)
1386 state->eof_reached =
true;
1394 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
1400 if (
state->current <= 0)
1407 if (
state->eof_reached)
1408 state->eof_reached =
false;
1412 if (
state->current <= 0)
1428 if (
state->lastReturnedTuple)
1436 if (
state->eof_reached)
1439 if ((tuplen =
getlen(
state->result_tape,
true)) != 0)
1454 state->eof_reached =
true;
1465 if (
state->eof_reached)
1473 2 *
sizeof(
unsigned int));
1476 else if (
nmoved != 2 *
sizeof(
unsigned int))
1477 elog(
ERROR,
"unexpected tape position");
1478 state->eof_reached =
false;
1487 sizeof(
unsigned int));
1490 else if (
nmoved !=
sizeof(
unsigned int))
1491 elog(
ERROR,
"unexpected tape position");
1498 tuplen + 2 *
sizeof(
unsigned int));
1499 if (
nmoved == tuplen +
sizeof(
unsigned int))
1510 else if (
nmoved != tuplen + 2 *
sizeof(
unsigned int))
1511 elog(
ERROR,
"bogus tuple length in backward scan");
1524 elog(
ERROR,
"bogus tuple length in backward scan");
1544 if (
state->lastReturnedTuple)
1553 if (
state->memtupcount > 0)
1578 state->nInputRuns--;
1618 switch (
state->status)
1621 if (
state->memtupcount -
state->current >= ntuples)
1623 state->current += ntuples;
1627 state->eof_reached =
true;
1635 elog(
ERROR,
"retrieved too many tuples in a bounded sort");
1647 while (ntuples-- > 0)
1740 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1778 elog(
LOG,
"worker %d switching to external sort with %d tapes: %s",
1788 state->currentRun = 0;
1794 state->nInputTapes = 0;
1795 state->nInputRuns = 0;
1798 state->nOutputTapes = 0;
1799 state->nOutputRuns = 0;
1859 state->nOutputTapes++;
1860 state->nOutputRuns++;
1869 state->nOutputRuns++;
1885 state->slabMemoryEnd =
state->slabMemoryBegin +
1890 p =
state->slabMemoryBegin;
1903 state->slabAllocatorUsed =
true;
1928 state->base.sortKeys->abbrev_converter =
NULL;
1929 state->base.sortKeys->comparator =
state->base.sortKeys->abbrev_full_comparator;
1932 state->base.sortKeys->abbrev_abort =
NULL;
1933 state->base.sortKeys->abbrev_full_comparator =
NULL;
1963 if (
state->base.tuples)
1989 elog(
LOG,
"worker %d using %zu KB of memory for tape buffers",
1990 state->worker,
state->tape_buffer_mem / 1024);
2000 if (
state->nInputRuns == 0)
2005 if (
state->nInputTapes > 0)
2023 state->nOutputTapes = 0;
2024 state->nOutputRuns = 0;
2036 elog(
LOG,
"starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2073 if (
state->nInputRuns == 0 &&
state->nOutputRuns <= 1)
2114 while (
state->memtupcount > 0)
2124 if (
state->memtuples[0].tuple)
2139 state->nInputRuns--;
2186 unsigned int tuplen;
2224 if (
state->memtupcount == 0 &&
state->currentRun > 0)
2236 errmsg(
"cannot have more than %d runs for an external sort",
2239 if (
state->currentRun > 0)
2242 state->currentRun++;
2245 elog(
LOG,
"worker %d starting quicksort of run %d: %s",
2256 elog(
LOG,
"worker %d finished quicksort of run %d: %s",
2268 state->memtupcount = 0;
2284 state->tupleMem = 0;
2289 elog(
LOG,
"worker %d finished writing run %d to tape %d: %s",
2304 switch (
state->status)
2308 state->eof_reached =
false;
2309 state->markpos_offset = 0;
2310 state->markpos_eof =
false;
2314 state->eof_reached =
false;
2315 state->markpos_block = 0
L;
2316 state->markpos_offset = 0;
2317 state->markpos_eof =
false;
2337 switch (
state->status)
2345 &
state->markpos_block,
2346 &
state->markpos_offset);
2368 switch (
state->status)
2376 state->markpos_block,
2377 state->markpos_offset);
2409 if (
state->isMaxSpaceDisk)
2415 switch (
state->maxSpaceStatus)
2418 if (
state->boundUsed)
2444 return "still in progress";
2446 return "top-N heapsort";
2450 return "external sort";
2452 return "external merge";
2496 state->memtupcount = 0;
2546 while (
state->memtupcount > 1)
2563 state->boundUsed =
true;
2577 return (key >> shift) & 0xFF;
2659 int num_partitions = 0;
2683 for (
int i = 0;
i < 256;
i++)
2723 for (
int i = 0;
i < num_partitions;
i++)
2731 size_t offset =
partitions[st->curbyte].offset++;
2737 *st = begin[offset];
2738 begin[offset] = tmp;
2760 if (level <
sizeof(
Datum) - 1)
2766 state->base.comparetup,
2786 state->base.comparetup_tiebreak,
2805 bool nulls_first =
state->base.sortKeys[0].ssup_nulls_first;
2818 while (d1 < n &&
data[d1].isnull1 == nulls_first)
2845 i += (
data[
i].isnull1 == nulls_first);
2855 i += (
data[
i].isnull1 == nulls_first);
2882 Assert(st->isnull1 ==
true);
2886 Assert(st->isnull1 ==
false);
2895 state->base.comparetup_tiebreak,
2907 state->base.comparetup,
2943#ifdef USE_ASSERT_CHECKING
2962 if (
state->memtupcount > 1)
2967 if (
state->base.haveDatum1 &&
state->base.sortKeys)
2989 state->base.onlyKey);
2995 state->base.comparetup,
3016 memtuples =
state->memtuples;
3025 j =
state->memtupcount++;
3028 int i = (
j - 1) >> 1;
3032 memtuples[
j] = memtuples[
i];
3035 memtuples[
j] = *tuple;
3051 if (--
state->memtupcount <= 0)
3058 tuple = &memtuples[
state->memtupcount];
3085 n =
state->memtupcount;
3089 unsigned int j = 2 *
i + 1;
3098 memtuples[
i] = memtuples[
j];
3101 memtuples[
i] = *tuple;
3143 unsigned int len = 0;
3171 state->slabFreeHead =
buf->nextfree;
3297 state->memtupsize = 0;
3344 int nParticipants =
state->nParticipants;
3345 int workersFinished;
3349 Assert(nParticipants >= 1);
3355 if (nParticipants != workersFinished)
3356 elog(
ERROR,
"cannot take over tapes before all workers finish");
3370 state->currentRun = nParticipants;
3381 state->nInputTapes = 0;
3382 state->nInputRuns = 0;
3385 state->nOutputTapes = nParticipants;
3386 state->nOutputRuns = nParticipants;
3388 for (
j = 0;
j < nParticipants;
j++)
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define palloc0_object(type)
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
void LogicalTapeClose(LogicalTape *lt)
void LogicalTapeSetClose(LogicalTapeSet *lts)
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
Size GetMemoryChunkSpace(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
void * repalloc_huge(void *pointer, Size size)
void MemoryContextResetOnly(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
const char * pg_rusage_show(const PGRUsage *ru0)
void pg_rusage_init(PGRUsage *ru0)
static char buf[DEFAULT_XLOG_SEG_SIZE]
static uint32 DatumGetUInt32(Datum X)
static int64 DatumGetInt64(Datum X)
static Datum UInt32GetDatum(uint32 X)
static int32 DatumGetInt32(Datum X)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
int(* comparator)(Datum x, Datum y, SortSupport ssup)
TuplesortMethod sortMethod
TuplesortSpaceType spaceType
LogicalTape ** inputTapes
LogicalTape ** outputTapes
TupSortStatus maxSpaceStatus
LogicalTape * result_tape
void tuplesort_rescan(Tuplesortstate *state)
void tuplesort_performsort(Tuplesortstate *state)
int tuplesort_merge_order(int64 allowedMem)
#define TAPE_BUFFER_OVERHEAD
static void tuplesort_heap_delete_top(Tuplesortstate *state)
#define INITIAL_MEMTUPSIZE
static unsigned int getlen(LogicalTape *tape, bool eofOK)
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
#define COMPARETUP(state, a, b)
static Datum normalize_datum(Datum orig, SortSupport ssup)
static void selectnewtape(Tuplesortstate *state)
void tuplesort_reset(Tuplesortstate *state)
static void markrunend(LogicalTape *tape)
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
static uint8 current_byte(Datum key, int level)
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
#define REMOVEABBREV(state, stup, count)
static void reversedirection(Tuplesortstate *state)
#define USEMEM(state, amt)
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
static bool grow_memtuples(Tuplesortstate *state)
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
static void beginmerge(Tuplesortstate *state)
static void make_bounded_heap(Tuplesortstate *state)
bool tuplesort_used_bound(Tuplesortstate *state)
#define WRITETUP(state, tape, stup)
static void sort_bounded_heap(Tuplesortstate *state)
static int worker_get_identifier(Tuplesortstate *state)
static void mergeonerun(Tuplesortstate *state)
#define FREEMEM(state, amt)
static void radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
static void verify_memtuples_sorted(Tuplesortstate *state)
static void inittapestate(Tuplesortstate *state, int maxTapes)
static void leader_takeover_tapes(Tuplesortstate *state)
Size tuplesort_estimate_shared(int nWorkers)
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
static void tuplesort_sort_memtuples(Tuplesortstate *state)
void tuplesort_end(Tuplesortstate *state)
static void inittapes(Tuplesortstate *state, bool mergeruns)
void tuplesort_markpos(Tuplesortstate *state)
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
const char * tuplesort_space_type_name(TuplesortSpaceType t)
static void radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
#define MERGE_BUFFER_SIZE
#define READTUP(state, stup, tape, len)
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
static void tuplesort_updatemax(Tuplesortstate *state)
static void worker_freeze_result_tape(Tuplesortstate *state)
#define RELEASE_SLAB_SLOT(state, tuple)
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
static void worker_nomergeruns(Tuplesortstate *state)
const char * tuplesort_method_name(TuplesortMethod m)
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
void tuplesort_restorepos(Tuplesortstate *state)
static void mergeruns(Tuplesortstate *state)
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
static void tuplesort_begin_batch(Tuplesortstate *state)
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
static bool consider_abort_common(Tuplesortstate *state)
static void tuplesort_free(Tuplesortstate *state)
static void dumptuples(Tuplesortstate *state, bool alltuples)
#define TupleSortUseBumpTupleCxt(opt)
#define TUPLESORT_RANDOMACCESS
#define TUPLESORT_ALLOWBOUNDED
char buffer[SLAB_SLOT_SIZE]
union SlabSlot * nextfree