PostgreSQL Source Code git master
Loading...
Searching...
No Matches
tuplesort.c File Reference
#include "postgres.h"
#include <limits.h>
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "storage/shmem.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
#include "utils/tuplesort.h"
#include "lib/sort_template.h"
Include dependency graph for tuplesort.c:

Go to the source code of this file.

Data Structures

union  SlabSlot
 
struct  Tuplesortstate
 
struct  Sharedsort
 
struct  RadixSortInfo
 

Macros

#define INITIAL_MEMTUPSIZE
 
#define SLAB_SLOT_SIZE   1024
 
#define MINORDER   6 /* minimum merge order */
 
#define MAXORDER   500 /* maximum merge order */
 
#define TAPE_BUFFER_OVERHEAD   BLCKSZ
 
#define MERGE_BUFFER_SIZE   (BLCKSZ * 32)
 
#define IS_SLAB_SLOT(state, tuple)
 
#define RELEASE_SLAB_SLOT(state, tuple)
 
#define REMOVEABBREV(state, stup, count)   ((*(state)->base.removeabbrev) (state, stup, count))
 
#define COMPARETUP(state, a, b)   ((*(state)->base.comparetup) (a, b, state))
 
#define WRITETUP(state, tape, stup)   ((*(state)->base.writetup) (state, tape, stup))
 
#define READTUP(state, stup, tape, len)   ((*(state)->base.readtup) (state, stup, tape, len))
 
#define FREESTATE(state)   ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
 
#define LACKMEM(state)   ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
 
#define USEMEM(state, amt)   ((state)->availMem -= (amt))
 
#define FREEMEM(state, amt)   ((state)->availMem += (amt))
 
#define SERIAL(state)   ((state)->shared == NULL)
 
#define WORKER(state)   ((state)->shared && (state)->worker != -1)
 
#define LEADER(state)   ((state)->shared && (state)->worker == -1)
 
#define ST_SORT   qsort_tuple
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE_RUNTIME_POINTER
 
#define ST_COMPARE_ARG_TYPE   Tuplesortstate
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DECLARE
 
#define ST_DEFINE
 
#define ST_SORT   qsort_ssup
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE(a, b, ssup)
 
#define ST_COMPARE_ARG_TYPE   SortSupportData
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DEFINE
 
#define QSORT_THRESHOLD   40
 

Typedefs

typedef union SlabSlot SlabSlot
 
typedef struct RadixSortInfo RadixSortInfo
 

Enumerations

enum  TupSortStatus {
  TSS_INITIAL , TSS_BOUNDED , TSS_BUILDRUNS , TSS_SORTEDINMEM ,
  TSS_SORTEDONTAPE , TSS_FINALMERGE
}
 

Functions

static void tuplesort_begin_batch (Tuplesortstate *state)
 
static bool consider_abort_common (Tuplesortstate *state)
 
static void inittapes (Tuplesortstate *state, bool mergeruns)
 
static void inittapestate (Tuplesortstate *state, int maxTapes)
 
static void selectnewtape (Tuplesortstate *state)
 
static void init_slab_allocator (Tuplesortstate *state, int numSlots)
 
static void mergeruns (Tuplesortstate *state)
 
static void mergeonerun (Tuplesortstate *state)
 
static void beginmerge (Tuplesortstate *state)
 
static bool mergereadnext (Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
 
static void dumptuples (Tuplesortstate *state, bool alltuples)
 
static void make_bounded_heap (Tuplesortstate *state)
 
static void sort_bounded_heap (Tuplesortstate *state)
 
static void tuplesort_sort_memtuples (Tuplesortstate *state)
 
static void tuplesort_heap_insert (Tuplesortstate *state, SortTuple *tuple)
 
static void tuplesort_heap_replace_top (Tuplesortstate *state, SortTuple *tuple)
 
static void tuplesort_heap_delete_top (Tuplesortstate *state)
 
static void reversedirection (Tuplesortstate *state)
 
static unsigned int getlen (LogicalTape *tape, bool eofOK)
 
static void markrunend (LogicalTape *tape)
 
static int worker_get_identifier (Tuplesortstate *state)
 
static void worker_freeze_result_tape (Tuplesortstate *state)
 
static void worker_nomergeruns (Tuplesortstate *state)
 
static void leader_takeover_tapes (Tuplesortstate *state)
 
static void free_sort_tuple (Tuplesortstate *state, SortTuple *stup)
 
static void tuplesort_free (Tuplesortstate *state)
 
static void tuplesort_updatemax (Tuplesortstate *state)
 
Tuplesortstatetuplesort_begin_common (int workMem, SortCoordinate coordinate, int sortopt)
 
void tuplesort_set_bound (Tuplesortstate *state, int64 bound)
 
bool tuplesort_used_bound (Tuplesortstate *state)
 
void tuplesort_end (Tuplesortstate *state)
 
void tuplesort_reset (Tuplesortstate *state)
 
static bool grow_memtuples (Tuplesortstate *state)
 
void tuplesort_puttuple_common (Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
 
void tuplesort_performsort (Tuplesortstate *state)
 
bool tuplesort_gettuple_common (Tuplesortstate *state, bool forward, SortTuple *stup)
 
bool tuplesort_skiptuples (Tuplesortstate *state, int64 ntuples, bool forward)
 
int tuplesort_merge_order (int64 allowedMem)
 
static int64 merge_read_buffer_size (int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
 
void tuplesort_rescan (Tuplesortstate *state)
 
void tuplesort_markpos (Tuplesortstate *state)
 
void tuplesort_restorepos (Tuplesortstate *state)
 
void tuplesort_get_stats (Tuplesortstate *state, TuplesortInstrumentation *stats)
 
const chartuplesort_method_name (TuplesortMethod m)
 
const chartuplesort_space_type_name (TuplesortSpaceType t)
 
static uint8 current_byte (Datum key, int level)
 
static Datum normalize_datum (Datum orig, SortSupport ssup)
 
static void radix_sort_recursive (SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
 
static void radix_sort_tuple (SortTuple *data, size_t n, Tuplesortstate *state)
 
static void verify_memtuples_sorted (Tuplesortstate *state)
 
voidtuplesort_readtup_alloc (Tuplesortstate *state, Size tuplen)
 
Size tuplesort_estimate_shared (int nWorkers)
 
void tuplesort_initialize_shared (Sharedsort *shared, int nWorkers, dsm_segment *seg)
 
void tuplesort_attach_shared (Sharedsort *shared, dsm_segment *seg)
 
int ssup_datum_unsigned_cmp (Datum x, Datum y, SortSupport ssup)
 
int ssup_datum_signed_cmp (Datum x, Datum y, SortSupport ssup)
 
int ssup_datum_int32_cmp (Datum x, Datum y, SortSupport ssup)
 

Variables

bool trace_sort = false
 

Macro Definition Documentation

◆ COMPARETUP

#define COMPARETUP (   state,
  a,
  b 
)    ((*(state)->base.comparetup) (a, b, state))

Definition at line 393 of file tuplesort.c.

◆ FREEMEM

#define FREEMEM (   state,
  amt 
)    ((state)->availMem += (amt))

Definition at line 399 of file tuplesort.c.

◆ FREESTATE

#define FREESTATE (   state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)

Definition at line 396 of file tuplesort.c.

◆ INITIAL_MEMTUPSIZE

#define INITIAL_MEMTUPSIZE
Value:
Max(1024, \
#define Max(x, y)
Definition c.h:1013
#define ALLOCSET_SEPARATE_THRESHOLD
Definition memutils.h:187

Definition at line 118 of file tuplesort.c.

142{
143 union SlabSlot *nextfree;
145} SlabSlot;
146
147/*
148 * Possible states of a Tuplesort object. These denote the states that
149 * persist between calls of Tuplesort routines.
150 */
151typedef enum
152{
153 TSS_INITIAL, /* Loading tuples; still within memory limit */
154 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
155 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
156 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
157 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
158 TSS_FINALMERGE, /* Performing final merge on-the-fly */
160
161/*
162 * Parameters for calculation of number of tapes to use --- see inittapes()
163 * and tuplesort_merge_order().
164 *
165 * In this calculation we assume that each tape will cost us about 1 blocks
166 * worth of buffer space. This ignores the overhead of all the other data
167 * structures needed for each tape, but it's probably close enough.
168 *
169 * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
170 * input tape, for pre-reading (see discussion at top of file). This is *in
171 * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
172 */
173#define MINORDER 6 /* minimum merge order */
174#define MAXORDER 500 /* maximum merge order */
175#define TAPE_BUFFER_OVERHEAD BLCKSZ
176#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
177
178
179/*
180 * Private state of a Tuplesort operation.
181 */
182struct Tuplesortstate
183{
185 TupSortStatus status; /* enumerated value as shown above */
186 bool bounded; /* did caller specify a maximum number of
187 * tuples to return? */
188 bool boundUsed; /* true if we made use of a bounded heap */
189 int bound; /* if bounded, the maximum number of tuples */
190 int64 tupleMem; /* memory consumed by individual tuples.
191 * storing this separately from what we track
192 * in availMem allows us to subtract the
193 * memory consumed by all tuples when dumping
194 * tuples to tape */
195 int64 availMem; /* remaining memory available, in bytes */
196 int64 allowedMem; /* total memory allowed, in bytes */
197 int maxTapes; /* max number of input tapes to merge in each
198 * pass */
199 int64 maxSpace; /* maximum amount of space occupied among sort
200 * of groups, either in-memory or on-disk */
201 bool isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
202 * false means in-memory */
203 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
204 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
205
206 /*
207 * This array holds the tuples now in sort memory. If we are in state
208 * INITIAL, the tuples are in no particular order; if we are in state
209 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
210 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
211 * H. In state SORTEDONTAPE, the array is not used.
212 */
213 SortTuple *memtuples; /* array of SortTuple structs */
214 int memtupcount; /* number of tuples currently present */
215 int memtupsize; /* allocated length of memtuples array */
216 bool growmemtuples; /* memtuples' growth still underway? */
217
218 /*
219 * Memory for tuples is sometimes allocated using a simple slab allocator,
220 * rather than with palloc(). Currently, we switch to slab allocation
221 * when we start merging. Merging only needs to keep a small, fixed
222 * number of tuples in memory at any time, so we can avoid the
223 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
224 * to hold the tuples.
225 *
226 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
227 * slots. The allocation is sized to have one slot per tape, plus one
228 * additional slot. We need that many slots to hold all the tuples kept
229 * in the heap during merge, plus the one we have last returned from the
230 * sort, with tuplesort_gettuple.
231 *
232 * Initially, all the slots are kept in a linked list of free slots. When
233 * a tuple is read from a tape, it is put to the next available slot, if
234 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
235 * instead.
236 *
237 * When we're done processing a tuple, we return the slot back to the free
238 * list, or pfree() if it was palloc'd. We know that a tuple was
239 * allocated from the slab, if its pointer value is between
240 * slabMemoryBegin and -End.
241 *
242 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
243 * tracking memory usage is not used.
244 */
246
247 char *slabMemoryBegin; /* beginning of slab memory arena */
248 char *slabMemoryEnd; /* end of slab memory arena */
249 SlabSlot *slabFreeHead; /* head of free list */
250
251 /* Memory used for input and output tape buffers. */
252 size_t tape_buffer_mem;
253
254 /*
255 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
256 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
257 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
258 * recycle the memory on next gettuple call.
259 */
260 void *lastReturnedTuple;
261
262 /*
263 * While building initial runs, this is the current output run number.
264 * Afterwards, it is the number of initial runs we made.
265 */
266 int currentRun;
267
268 /*
269 * Logical tapes, for merging.
270 *
271 * The initial runs are written in the output tapes. In each merge pass,
272 * the output tapes of the previous pass become the input tapes, and new
273 * output tapes are created as needed. When nInputTapes equals
274 * nInputRuns, there is only one merge pass left.
275 */
277 int nInputTapes;
278 int nInputRuns;
279
281 int nOutputTapes;
282 int nOutputRuns;
283
284 LogicalTape *destTape; /* current output tape */
285
286 /*
287 * These variables are used after completion of sorting to keep track of
288 * the next tuple to return. (In the tape case, the tape's current read
289 * position is also critical state.)
290 */
291 LogicalTape *result_tape; /* actual tape of finished output */
292 int current; /* array index (only used if SORTEDINMEM) */
293 bool eof_reached; /* reached EOF (needed for cursors) */
294
295 /* markpos_xxx holds marked position for mark and restore */
296 int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
297 int markpos_offset; /* saved "current", or offset in tape block */
298 bool markpos_eof; /* saved "eof_reached" */
299
300 /*
301 * These variables are used during parallel sorting.
302 *
303 * worker is our worker identifier. Follows the general convention that
304 * -1 value relates to a leader tuplesort, and values >= 0 worker
305 * tuplesorts. (-1 can also be a serial tuplesort.)
306 *
307 * shared is mutable shared memory state, which is used to coordinate
308 * parallel sorts.
309 *
310 * nParticipants is the number of worker Tuplesortstates known by the
311 * leader to have actually been launched, which implies that they must
312 * finish a run that the leader needs to merge. Typically includes a
313 * worker state held by the leader process itself. Set in the leader
314 * Tuplesortstate only.
315 */
316 int worker;
318 int nParticipants;
319
320 /*
321 * Additional state for managing "abbreviated key" sortsupport routines
322 * (which currently may be used by all cases except the hash index case).
323 * Tracks the intervals at which the optimization's effectiveness is
324 * tested.
325 */
326 int64 abbrevNext; /* Tuple # at which to next check
327 * applicability */
328
329 /*
330 * Resource snapshot for time of sort start.
331 */
333};
334
335/*
336 * Private mutable state of tuplesort-parallel-operation. This is allocated
337 * in shared memory.
338 */
339struct Sharedsort
340{
341 /* mutex protects all fields prior to tapes */
343
344 /*
345 * currentWorker generates ordinal identifier numbers for parallel sort
346 * workers. These start from 0, and are always gapless.
347 *
348 * Workers increment workersFinished to indicate having finished. If this
349 * is equal to state.nParticipants within the leader, leader is ready to
350 * merge worker runs.
351 */
352 int currentWorker;
353 int workersFinished;
354
355 /* Temporary file space */
357
358 /* Size of tapes flexible array */
359 int nTapes;
360
361 /*
362 * Tapes array used by workers to report back information needed by the
363 * leader to concatenate all worker tapes into one for merging
364 */
366};
367
368/*
369 * Is the given tuple allocated from the slab memory arena?
370 */
371#define IS_SLAB_SLOT(state, tuple) \
372 ((char *) (tuple) >= (state)->slabMemoryBegin && \
373 (char *) (tuple) < (state)->slabMemoryEnd)
374
375/*
376 * Return the given tuple to the slab memory free list, or free it
377 * if it was palloc'd.
378 */
379#define RELEASE_SLAB_SLOT(state, tuple) \
380 do { \
381 SlabSlot *buf = (SlabSlot *) tuple; \
382 \
383 if (IS_SLAB_SLOT((state), buf)) \
384 { \
385 buf->nextfree = (state)->slabFreeHead; \
386 (state)->slabFreeHead = buf; \
387 } else \
388 pfree(buf); \
389 } while(0)
390
391#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
392#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
393#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
394#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
395#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
396#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
397#define USEMEM(state,amt) ((state)->availMem -= (amt))
398#define FREEMEM(state,amt) ((state)->availMem += (amt))
399#define SERIAL(state) ((state)->shared == NULL)
400#define WORKER(state) ((state)->shared && (state)->worker != -1)
401#define LEADER(state) ((state)->shared && (state)->worker == -1)
402
403/*
404 * NOTES about on-tape representation of tuples:
405 *
406 * We require the first "unsigned int" of a stored tuple to be the total size
407 * on-tape of the tuple, including itself (so it is never zero; an all-zero
408 * unsigned int is used to delimit runs). The remainder of the stored tuple
409 * may or may not match the in-memory representation of the tuple ---
410 * any conversion needed is the job of the writetup and readtup routines.
411 *
412 * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
413 * representation of the tuple must be followed by another "unsigned int" that
414 * is a copy of the length --- so the total tape space used is actually
415 * sizeof(unsigned int) more than the stored length value. This allows
416 * read-backwards. When the random access flag was not specified, the
417 * write/read routines may omit the extra length word.
418 *
419 * writetup is expected to write both length words as well as the tuple
420 * data. When readtup is called, the tape is positioned just after the
421 * front length word; readtup must read the tuple data and advance past
422 * the back length word (if present).
423 *
424 * The write/read routines can make use of the tuple description data
425 * stored in the Tuplesortstate record, if needed. They are also expected
426 * to adjust state->availMem by the amount of memory space (not tape space!)
427 * released or consumed. There is no error return from either writetup
428 * or readtup; they should ereport() on failure.
429 *
430 *
431 * NOTES about memory consumption calculations:
432 *
433 * We count space allocated for tuples against the workMem limit, plus
434 * the space used by the variable-size memtuples array. Fixed-size space
435 * is not counted; it's small enough to not be interesting.
436 *
437 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
438 * rather than the originally-requested size. This is important since
439 * palloc can add substantial overhead. It's not a complete answer since
440 * we won't count any wasted space in palloc allocation blocks, but it's
441 * a lot better than what we were doing before 7.3. As of 9.6, a
442 * separate memory context is used for caller passed tuples. Resetting
443 * it at certain key increments significantly ameliorates fragmentation.
444 * readtup routines use the slab allocator (they cannot use
445 * the reset context because it gets deleted at the point that merging
446 * begins).
447 */
448
449
452static void inittapes(Tuplesortstate *state, bool mergeruns);
453static void inittapestate(Tuplesortstate *state, int maxTapes);
456static void mergeruns(Tuplesortstate *state);
457static void mergeonerun(Tuplesortstate *state);
458static void beginmerge(Tuplesortstate *state);
460static void dumptuples(Tuplesortstate *state, bool alltuples);
468static unsigned int getlen(LogicalTape *tape, bool eofOK);
469static void markrunend(LogicalTape *tape);
477
478
479/*
480 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
481 * any variant of SortTuples, using the appropriate comparetup function.
482 * qsort_ssup() is specialized for the case where the comparetup function
483 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
484 * and Datum sorts.
485 */
486
487#define ST_SORT qsort_tuple
488#define ST_ELEMENT_TYPE SortTuple
489#define ST_COMPARE_RUNTIME_POINTER
490#define ST_COMPARE_ARG_TYPE Tuplesortstate
491#define ST_CHECK_FOR_INTERRUPTS
492#define ST_SCOPE static
493#define ST_DECLARE
494#define ST_DEFINE
495#include "lib/sort_template.h"
496
497#define ST_SORT qsort_ssup
498#define ST_ELEMENT_TYPE SortTuple
499#define ST_COMPARE(a, b, ssup) \
500 ApplySortComparator((a)->datum1, (a)->isnull1, \
501 (b)->datum1, (b)->isnull1, (ssup))
502#define ST_COMPARE_ARG_TYPE SortSupportData
503#define ST_CHECK_FOR_INTERRUPTS
504#define ST_SCOPE static
505#define ST_DEFINE
506#include "lib/sort_template.h"
507
508/* state for radix sort */
509typedef struct RadixSortInfo
510{
511 union
512 {
513 size_t count;
514 size_t offset;
515 };
516 size_t next_offset;
518
519/*
520 * Threshold below which qsort_tuple() is generally faster than a radix sort.
521 */
522#define QSORT_THRESHOLD 40
523
524
525/*
526 * tuplesort_begin_xxx
527 *
528 * Initialize for a tuple sort operation.
529 *
530 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
531 * zero or more times, then call tuplesort_performsort when all the tuples
532 * have been supplied. After performsort, retrieve the tuples in sorted
533 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
534 * access was requested, rescan, markpos, and restorepos can also be called.)
535 * Call tuplesort_end to terminate the operation and release memory/disk space.
536 *
537 * Each variant of tuplesort_begin has a workMem parameter specifying the
538 * maximum number of kilobytes of RAM to use before spilling data to disk.
539 * (The normal value of this parameter is work_mem, but some callers use
540 * other values.) Each variant also has a sortopt which is a bitmask of
541 * sort options. See TUPLESORT_* definitions in tuplesort.h
542 */
543
546{
548 MemoryContext maincontext;
549 MemoryContext sortcontext;
550 MemoryContext oldcontext;
551
552 /* See leader_takeover_tapes() remarks on random access support */
553 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
554 elog(ERROR, "random access disallowed under parallel sort");
555
556 /*
557 * Memory context surviving tuplesort_reset. This memory context holds
558 * data which is useful to keep while sorting multiple similar batches.
559 */
561 "TupleSort main",
563
564 /*
565 * Create a working memory context for one sort operation. The content of
566 * this context is deleted by tuplesort_reset.
567 */
568 sortcontext = AllocSetContextCreate(maincontext,
569 "TupleSort sort",
571
572 /*
573 * Additionally a working memory context for tuples is setup in
574 * tuplesort_begin_batch.
575 */
576
577 /*
578 * Make the Tuplesortstate within the per-sortstate context. This way, we
579 * don't need a separate pfree() operation for it at shutdown.
580 */
581 oldcontext = MemoryContextSwitchTo(maincontext);
582
584
585 if (trace_sort)
586 pg_rusage_init(&state->ru_start);
587
588 state->base.sortopt = sortopt;
589 state->base.tuples = true;
590 state->abbrevNext = 10;
591
592 /*
593 * workMem is forced to be at least 64KB, the current minimum valid value
594 * for the work_mem GUC. This is a defense against parallel sort callers
595 * that divide out memory among many workers in a way that leaves each
596 * with very little memory.
597 */
598 state->allowedMem = Max(workMem, 64) * (int64) 1024;
599 state->base.sortcontext = sortcontext;
600 state->base.maincontext = maincontext;
601
602 state->memtupsize = INITIAL_MEMTUPSIZE;
603 state->memtuples = NULL;
604
605 /*
606 * After all of the other non-parallel-related state, we setup all of the
607 * state needed for each batch.
608 */
610
611 /*
612 * Initialize parallel-related state based on coordination information
613 * from caller
614 */
615 if (!coordinate)
616 {
617 /* Serial sort */
618 state->shared = NULL;
619 state->worker = -1;
620 state->nParticipants = -1;
621 }
622 else if (coordinate->isWorker)
623 {
624 /* Parallel worker produces exactly one final run from all input */
625 state->shared = coordinate->sharedsort;
627 state->nParticipants = -1;
628 }
629 else
630 {
631 /* Parallel leader state only used for final merge */
632 state->shared = coordinate->sharedsort;
633 state->worker = -1;
634 state->nParticipants = coordinate->nParticipants;
635 Assert(state->nParticipants >= 1);
636 }
637
638 MemoryContextSwitchTo(oldcontext);
639
640 return state;
641}
642
643/*
644 * tuplesort_begin_batch
645 *
646 * Setup, or reset, all state need for processing a new set of tuples with this
647 * sort state. Called both from tuplesort_begin_common (the first time sorting
648 * with this sort state) and tuplesort_reset (for subsequent usages).
649 */
650static void
652{
653 MemoryContext oldcontext;
654
655 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
656
657 /*
658 * Caller tuple (e.g. IndexTuple) memory context.
659 *
660 * A dedicated child context used exclusively for caller passed tuples
661 * eases memory management. Resetting at key points reduces
662 * fragmentation. Note that the memtuples array of SortTuples is allocated
663 * in the parent context, not this context, because there is no need to
664 * free memtuples early. For bounded sorts, tuples may be pfreed in any
665 * order, so we use a regular aset.c context so that it can make use of
666 * free'd memory. When the sort is not bounded, we make use of a bump.c
667 * context as this keeps allocations more compact with less wastage.
668 * Allocations are also slightly more CPU efficient.
669 */
670 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
671 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
672 "Caller tuples",
674 else
675 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
676 "Caller tuples",
678
679
680 state->status = TSS_INITIAL;
681 state->bounded = false;
682 state->boundUsed = false;
683
684 state->availMem = state->allowedMem;
685
686 state->tapeset = NULL;
687
688 state->memtupcount = 0;
689
690 state->growmemtuples = true;
691 state->slabAllocatorUsed = false;
692 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
693 {
694 pfree(state->memtuples);
695 state->memtuples = NULL;
696 state->memtupsize = INITIAL_MEMTUPSIZE;
697 }
698 if (state->memtuples == NULL)
699 {
700 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
702 }
703
704 /* workMem must be large enough for the minimal memtuples array */
705 if (LACKMEM(state))
706 elog(ERROR, "insufficient memory allowed for sort");
707
708 state->currentRun = 0;
709
710 /*
711 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
712 * inittapes(), if needed.
713 */
714
715 state->result_tape = NULL; /* flag that result tape has not been formed */
716
717 MemoryContextSwitchTo(oldcontext);
718}
719
720/*
721 * tuplesort_set_bound
722 *
723 * Advise tuplesort that at most the first N result tuples are required.
724 *
725 * Must be called before inserting any tuples. (Actually, we could allow it
726 * as long as the sort hasn't spilled to disk, but there seems no need for
727 * delayed calls at the moment.)
728 *
729 * This is a hint only. The tuplesort may still return more tuples than
730 * requested. Parallel leader tuplesorts will always ignore the hint.
731 */
732void
734{
735 /* Assert we're called before loading any tuples */
736 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
737 /* Assert we allow bounded sorts */
738 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
739 /* Can't set the bound twice, either */
740 Assert(!state->bounded);
741 /* Also, this shouldn't be called in a parallel worker */
743
744 /* Parallel leader allows but ignores hint */
745 if (LEADER(state))
746 return;
747
748#ifdef DEBUG_BOUNDED_SORT
749 /* Honor GUC setting that disables the feature (for easy testing) */
751 return;
752#endif
753
754 /* We want to be able to compute bound * 2, so limit the setting */
755 if (bound > (int64) (INT_MAX / 2))
756 return;
757
758 state->bounded = true;
759 state->bound = (int) bound;
760
761 /*
762 * Bounded sorts are not an effective target for abbreviated key
763 * optimization. Disable by setting state to be consistent with no
764 * abbreviation support.
765 */
766 state->base.sortKeys->abbrev_converter = NULL;
767 if (state->base.sortKeys->abbrev_full_comparator)
768 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
769
770 /* Not strictly necessary, but be tidy */
771 state->base.sortKeys->abbrev_abort = NULL;
772 state->base.sortKeys->abbrev_full_comparator = NULL;
773}
774
775/*
776 * tuplesort_used_bound
777 *
778 * Allow callers to find out if the sort state was able to use a bound.
779 */
780bool
782{
783 return state->boundUsed;
784}
785
786/*
787 * tuplesort_free
788 *
789 * Internal routine for freeing resources of tuplesort.
790 */
791static void
793{
794 /* context swap probably not needed, but let's be safe */
795 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
796 int64 spaceUsed;
797
798 if (state->tapeset)
799 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
800 else
801 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
802
803 /*
804 * Delete temporary "tape" files, if any.
805 *
806 * We don't bother to destroy the individual tapes here. They will go away
807 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
808 * finished tapes already.)
809 */
810 if (state->tapeset)
811 LogicalTapeSetClose(state->tapeset);
812
813 if (trace_sort)
814 {
815 if (state->tapeset)
816 elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
817 SERIAL(state) ? "external sort" : "parallel external sort",
818 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
819 else
820 elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
821 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
822 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
823 }
824
825 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
826
828 MemoryContextSwitchTo(oldcontext);
829
830 /*
831 * Free the per-sort memory context, thereby releasing all working memory.
832 */
833 MemoryContextReset(state->base.sortcontext);
834}
835
836/*
837 * tuplesort_end
838 *
839 * Release resources and clean up.
840 *
841 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
842 * pointing to garbage. Be careful not to attempt to use or free such
843 * pointers afterwards!
844 */
845void
847{
849
850 /*
851 * Free the main memory context, including the Tuplesortstate struct
852 * itself.
853 */
854 MemoryContextDelete(state->base.maincontext);
855}
856
857/*
858 * tuplesort_updatemax
859 *
860 * Update maximum resource usage statistics.
861 */
862static void
864{
865 int64 spaceUsed;
866 bool isSpaceDisk;
867
868 /*
869 * Note: it might seem we should provide both memory and disk usage for a
870 * disk-based sort. However, the current code doesn't track memory space
871 * accurately once we have begun to return tuples to the caller (since we
872 * don't account for pfree's the caller is expected to do), so we cannot
873 * rely on availMem in a disk sort. This does not seem worth the overhead
874 * to fix. Is it worth creating an API for the memory context code to
875 * tell us how much is actually used in sortcontext?
876 */
877 if (state->tapeset)
878 {
879 isSpaceDisk = true;
880 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
881 }
882 else
883 {
884 isSpaceDisk = false;
885 spaceUsed = state->allowedMem - state->availMem;
886 }
887
888 /*
889 * Sort evicts data to the disk when it wasn't able to fit that data into
890 * main memory. This is why we assume space used on the disk to be more
891 * important for tracking resource usage than space used in memory. Note
892 * that the amount of space occupied by some tupleset on the disk might be
893 * less than amount of space occupied by the same tupleset in memory due
894 * to more compact representation.
895 */
896 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
897 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
898 {
899 state->maxSpace = spaceUsed;
900 state->isMaxSpaceDisk = isSpaceDisk;
901 state->maxSpaceStatus = state->status;
902 }
903}
904
905/*
906 * tuplesort_reset
907 *
908 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
909 * meta-information in. After tuplesort_reset, tuplesort is ready to start
910 * a new sort. This allows avoiding recreation of tuple sort states (and
911 * save resources) when sorting multiple small batches.
912 */
913void
915{
918
919 /*
920 * After we've freed up per-batch memory, re-setup all of the state common
921 * to both the first batch and any subsequent batch.
922 */
924
925 state->lastReturnedTuple = NULL;
926 state->slabMemoryBegin = NULL;
927 state->slabMemoryEnd = NULL;
928 state->slabFreeHead = NULL;
929}
930
931/*
932 * Grow the memtuples[] array, if possible within our memory constraint. We
933 * must not exceed INT_MAX tuples in memory or the caller-provided memory
934 * limit. Return true if we were able to enlarge the array, false if not.
935 *
936 * Normally, at each increment we double the size of the array. When doing
937 * that would exceed a limit, we attempt one last, smaller increase (and then
938 * clear the growmemtuples flag so we don't try any more). That allows us to
939 * use memory as fully as permitted; sticking to the pure doubling rule could
940 * result in almost half going unused. Because availMem moves around with
941 * tuple addition/removal, we need some rule to prevent making repeated small
942 * increases in memtupsize, which would just be useless thrashing. The
943 * growmemtuples flag accomplishes that and also prevents useless
944 * recalculations in this function.
945 */
946static bool
948{
949 int newmemtupsize;
950 int memtupsize = state->memtupsize;
951 int64 memNowUsed = state->allowedMem - state->availMem;
952
953 /* Forget it if we've already maxed out memtuples, per comment above */
954 if (!state->growmemtuples)
955 return false;
956
957 /* Select new value of memtupsize */
958 if (memNowUsed <= state->availMem)
959 {
960 /*
961 * We've used no more than half of allowedMem; double our usage,
962 * clamping at INT_MAX tuples.
963 */
964 if (memtupsize < INT_MAX / 2)
965 newmemtupsize = memtupsize * 2;
966 else
967 {
969 state->growmemtuples = false;
970 }
971 }
972 else
973 {
974 /*
975 * This will be the last increment of memtupsize. Abandon doubling
976 * strategy and instead increase as much as we safely can.
977 *
978 * To stay within allowedMem, we can't increase memtupsize by more
979 * than availMem / sizeof(SortTuple) elements. In practice, we want
980 * to increase it by considerably less, because we need to leave some
981 * space for the tuples to which the new array slots will refer. We
982 * assume the new tuples will be about the same size as the tuples
983 * we've already seen, and thus we can extrapolate from the space
984 * consumption so far to estimate an appropriate new size for the
985 * memtuples array. The optimal value might be higher or lower than
986 * this estimate, but it's hard to know that in advance. We again
987 * clamp at INT_MAX tuples.
988 *
989 * This calculation is safe against enlarging the array so much that
990 * LACKMEM becomes true, because the memory currently used includes
991 * the present array; thus, there would be enough allowedMem for the
992 * new array elements even if no other memory were currently used.
993 *
994 * We do the arithmetic in float8, because otherwise the product of
995 * memtupsize and allowedMem could overflow. Any inaccuracy in the
996 * result should be insignificant; but even if we computed a
997 * completely insane result, the checks below will prevent anything
998 * really bad from happening.
999 */
1000 double grow_ratio;
1001
1002 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1003 if (memtupsize * grow_ratio < INT_MAX)
1004 newmemtupsize = (int) (memtupsize * grow_ratio);
1005 else
1007
1008 /* We won't make any further enlargement attempts */
1009 state->growmemtuples = false;
1010 }
1011
1012 /* Must enlarge array by at least one element, else report failure */
1013 if (newmemtupsize <= memtupsize)
1014 goto noalloc;
1015
1016 /*
1017 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1018 * to ensure our request won't be rejected. Note that we can easily
1019 * exhaust address space before facing this outcome. (This is presently
1020 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1021 * don't rely on that at this distance.)
1022 */
1023 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1024 {
1026 state->growmemtuples = false; /* can't grow any more */
1027 }
1028
1029 /*
1030 * We need to be sure that we do not cause LACKMEM to become true, else
1031 * the space management algorithm will go nuts. The code above should
1032 * never generate a dangerous request, but to be safe, check explicitly
1033 * that the array growth fits within availMem. (We could still cause
1034 * LACKMEM if the memory chunk overhead associated with the memtuples
1035 * array were to increase. That shouldn't happen because we chose the
1036 * initial array size large enough to ensure that palloc will be treating
1037 * both old and new arrays as separate chunks. But we'll check LACKMEM
1038 * explicitly below just in case.)
1039 */
1040 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1041 goto noalloc;
1042
1043 /* OK, do it */
1044 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1045 state->memtupsize = newmemtupsize;
1046 state->memtuples = (SortTuple *)
1047 repalloc_huge(state->memtuples,
1048 state->memtupsize * sizeof(SortTuple));
1049 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1050 if (LACKMEM(state))
1051 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1052 return true;
1053
1054noalloc:
1055 /* If for any reason we didn't realloc, shut off future attempts */
1056 state->growmemtuples = false;
1057 return false;
1058}
1059
1060/*
1061 * Shared code for tuple and datum cases.
1062 */
1063void
1065 bool useAbbrev, Size tuplen)
1066{
1067 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1068
1069 Assert(!LEADER(state));
1070
1071 /* account for the memory used for this tuple */
1072 USEMEM(state, tuplen);
1073 state->tupleMem += tuplen;
1074
1075 if (!useAbbrev)
1076 {
1077 /*
1078 * Leave ordinary Datum representation, or NULL value. If there is a
1079 * converter it won't expect NULL values, and cost model is not
1080 * required to account for NULL, so in that case we avoid calling
1081 * converter and just set datum1 to zeroed representation (to be
1082 * consistent, and to support cheap inequality tests for NULL
1083 * abbreviated keys).
1084 */
1085 }
1086 else if (!consider_abort_common(state))
1087 {
1088 /* Store abbreviated key representation */
1089 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1090 state->base.sortKeys);
1091 }
1092 else
1093 {
1094 /*
1095 * Set state to be consistent with never trying abbreviation.
1096 *
1097 * Alter datum1 representation in already-copied tuples, so as to
1098 * ensure a consistent representation (current tuple was just
1099 * handled). It does not matter if some dumped tuples are already
1100 * sorted on tape, since serialized tuples lack abbreviated keys
1101 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1102 */
1103 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1104 }
1105
1106 switch (state->status)
1107 {
1108 case TSS_INITIAL:
1109
1110 /*
1111 * Save the tuple into the unsorted array. First, grow the array
1112 * as needed. Note that we try to grow the array when there is
1113 * still one free slot remaining --- if we fail, there'll still be
1114 * room to store the incoming tuple, and then we'll switch to
1115 * tape-based operation.
1116 */
1117 if (state->memtupcount >= state->memtupsize - 1)
1118 {
1120 Assert(state->memtupcount < state->memtupsize);
1121 }
1122 state->memtuples[state->memtupcount++] = *tuple;
1123
1124 /*
1125 * Check if it's time to switch over to a bounded heapsort. We do
1126 * so if the input tuple count exceeds twice the desired tuple
1127 * count (this is a heuristic for where heapsort becomes cheaper
1128 * than a quicksort), or if we've just filled workMem and have
1129 * enough tuples to meet the bound.
1130 *
1131 * Note that once we enter TSS_BOUNDED state we will always try to
1132 * complete the sort that way. In the worst case, if later input
1133 * tuples are larger than earlier ones, this might cause us to
1134 * exceed workMem significantly.
1135 */
1136 if (state->bounded &&
1137 (state->memtupcount > state->bound * 2 ||
1138 (state->memtupcount > state->bound && LACKMEM(state))))
1139 {
1140 if (trace_sort)
1141 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1142 state->memtupcount,
1143 pg_rusage_show(&state->ru_start));
1145 MemoryContextSwitchTo(oldcontext);
1146 return;
1147 }
1148
1149 /*
1150 * Done if we still fit in available memory and have array slots.
1151 */
1152 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1153 {
1154 MemoryContextSwitchTo(oldcontext);
1155 return;
1156 }
1157
1158 /*
1159 * Nope; time to switch to tape-based operation.
1160 */
1161 inittapes(state, true);
1162
1163 /*
1164 * Dump all tuples.
1165 */
1166 dumptuples(state, false);
1167 break;
1168
1169 case TSS_BOUNDED:
1170
1171 /*
1172 * We don't want to grow the array here, so check whether the new
1173 * tuple can be discarded before putting it in. This should be a
1174 * good speed optimization, too, since when there are many more
1175 * input tuples than the bound, most input tuples can be discarded
1176 * with just this one comparison. Note that because we currently
1177 * have the sort direction reversed, we must check for <= not >=.
1178 */
1179 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1180 {
1181 /* new tuple <= top of the heap, so we can discard it */
1182 free_sort_tuple(state, tuple);
1184 }
1185 else
1186 {
1187 /* discard top of heap, replacing it with the new tuple */
1188 free_sort_tuple(state, &state->memtuples[0]);
1190 }
1191 break;
1192
1193 case TSS_BUILDRUNS:
1194
1195 /*
1196 * Save the tuple into the unsorted array (there must be space)
1197 */
1198 state->memtuples[state->memtupcount++] = *tuple;
1199
1200 /*
1201 * If we are over the memory limit, dump all tuples.
1202 */
1203 dumptuples(state, false);
1204 break;
1205
1206 default:
1207 elog(ERROR, "invalid tuplesort state");
1208 break;
1209 }
1210 MemoryContextSwitchTo(oldcontext);
1211}
1212
1213static bool
1215{
1216 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1217 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1218 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1219
1220 /*
1221 * Check effectiveness of abbreviation optimization. Consider aborting
1222 * when still within memory limit.
1223 */
1224 if (state->status == TSS_INITIAL &&
1225 state->memtupcount >= state->abbrevNext)
1226 {
1227 state->abbrevNext *= 2;
1228
1229 /*
1230 * Check opclass-supplied abbreviation abort routine. It may indicate
1231 * that abbreviation should not proceed.
1232 */
1233 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1234 state->base.sortKeys))
1235 return false;
1236
1237 /*
1238 * Finally, restore authoritative comparator, and indicate that
1239 * abbreviation is not in play by setting abbrev_converter to NULL
1240 */
1241 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1242 state->base.sortKeys[0].abbrev_converter = NULL;
1243 /* Not strictly necessary, but be tidy */
1244 state->base.sortKeys[0].abbrev_abort = NULL;
1245 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1246
1247 /* Give up - expect original pass-by-value representation */
1248 return true;
1249 }
1250
1251 return false;
1252}
1253
1254/*
1255 * All tuples have been provided; finish the sort.
1256 */
1257void
1259{
1260 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1261
1262 if (trace_sort)
1263 elog(LOG, "performsort of worker %d starting: %s",
1264 state->worker, pg_rusage_show(&state->ru_start));
1265
1266 switch (state->status)
1267 {
1268 case TSS_INITIAL:
1269
1270 /*
1271 * We were able to accumulate all the tuples within the allowed
1272 * amount of memory, or leader to take over worker tapes
1273 */
1274 if (SERIAL(state))
1275 {
1276 /* Sort in memory and we're done */
1278 state->status = TSS_SORTEDINMEM;
1279 }
1280 else if (WORKER(state))
1281 {
1282 /*
1283 * Parallel workers must still dump out tuples to tape. No
1284 * merge is required to produce single output run, though.
1285 */
1286 inittapes(state, false);
1287 dumptuples(state, true);
1289 state->status = TSS_SORTEDONTAPE;
1290 }
1291 else
1292 {
1293 /*
1294 * Leader will take over worker tapes and merge worker runs.
1295 * Note that mergeruns sets the correct state->status.
1296 */
1299 }
1300 state->current = 0;
1301 state->eof_reached = false;
1302 state->markpos_block = 0L;
1303 state->markpos_offset = 0;
1304 state->markpos_eof = false;
1305 break;
1306
1307 case TSS_BOUNDED:
1308
1309 /*
1310 * We were able to accumulate all the tuples required for output
1311 * in memory, using a heap to eliminate excess tuples. Now we
1312 * have to transform the heap to a properly-sorted array. Note
1313 * that sort_bounded_heap sets the correct state->status.
1314 */
1316 state->current = 0;
1317 state->eof_reached = false;
1318 state->markpos_offset = 0;
1319 state->markpos_eof = false;
1320 break;
1321
1322 case TSS_BUILDRUNS:
1323
1324 /*
1325 * Finish tape-based sort. First, flush all tuples remaining in
1326 * memory out to tape; then merge until we have a single remaining
1327 * run (or, if !randomAccess and !WORKER(), one run per tape).
1328 * Note that mergeruns sets the correct state->status.
1329 */
1330 dumptuples(state, true);
1332 state->eof_reached = false;
1333 state->markpos_block = 0L;
1334 state->markpos_offset = 0;
1335 state->markpos_eof = false;
1336 break;
1337
1338 default:
1339 elog(ERROR, "invalid tuplesort state");
1340 break;
1341 }
1342
1343 if (trace_sort)
1344 {
1345 if (state->status == TSS_FINALMERGE)
1346 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1347 state->worker, state->nInputTapes,
1348 pg_rusage_show(&state->ru_start));
1349 else
1350 elog(LOG, "performsort of worker %d done: %s",
1351 state->worker, pg_rusage_show(&state->ru_start));
1352 }
1353
1354 MemoryContextSwitchTo(oldcontext);
1355}
1356
1357/*
1358 * Internal routine to fetch the next tuple in either forward or back
1359 * direction into *stup. Returns false if no more tuples.
1360 * Returned tuple belongs to tuplesort memory context, and must not be freed
1361 * by caller. Note that fetched tuple is stored in memory that may be
1362 * recycled by any future fetch.
1363 */
1364bool
1366 SortTuple *stup)
1367{
1368 unsigned int tuplen;
1369 size_t nmoved;
1370
1371 Assert(!WORKER(state));
1372
1373 switch (state->status)
1374 {
1375 case TSS_SORTEDINMEM:
1376 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1377 Assert(!state->slabAllocatorUsed);
1378 if (forward)
1379 {
1380 if (state->current < state->memtupcount)
1381 {
1382 *stup = state->memtuples[state->current++];
1383 return true;
1384 }
1385 state->eof_reached = true;
1386
1387 /*
1388 * Complain if caller tries to retrieve more tuples than
1389 * originally asked for in a bounded sort. This is because
1390 * returning EOF here might be the wrong thing.
1391 */
1392 if (state->bounded && state->current >= state->bound)
1393 elog(ERROR, "retrieved too many tuples in a bounded sort");
1394
1395 return false;
1396 }
1397 else
1398 {
1399 if (state->current <= 0)
1400 return false;
1401
1402 /*
1403 * if all tuples are fetched already then we return last
1404 * tuple, else - tuple before last returned.
1405 */
1406 if (state->eof_reached)
1407 state->eof_reached = false;
1408 else
1409 {
1410 state->current--; /* last returned tuple */
1411 if (state->current <= 0)
1412 return false;
1413 }
1414 *stup = state->memtuples[state->current - 1];
1415 return true;
1416 }
1417 break;
1418
1419 case TSS_SORTEDONTAPE:
1420 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1421 Assert(state->slabAllocatorUsed);
1422
1423 /*
1424 * The slot that held the tuple that we returned in previous
1425 * gettuple call can now be reused.
1426 */
1427 if (state->lastReturnedTuple)
1428 {
1429 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1430 state->lastReturnedTuple = NULL;
1431 }
1432
1433 if (forward)
1434 {
1435 if (state->eof_reached)
1436 return false;
1437
1438 if ((tuplen = getlen(state->result_tape, true)) != 0)
1439 {
1440 READTUP(state, stup, state->result_tape, tuplen);
1441
1442 /*
1443 * Remember the tuple we return, so that we can recycle
1444 * its memory on next call. (This can be NULL, in the
1445 * !state->tuples case).
1446 */
1447 state->lastReturnedTuple = stup->tuple;
1448
1449 return true;
1450 }
1451 else
1452 {
1453 state->eof_reached = true;
1454 return false;
1455 }
1456 }
1457
1458 /*
1459 * Backward.
1460 *
1461 * if all tuples are fetched already then we return last tuple,
1462 * else - tuple before last returned.
1463 */
1464 if (state->eof_reached)
1465 {
1466 /*
1467 * Seek position is pointing just past the zero tuplen at the
1468 * end of file; back up to fetch last tuple's ending length
1469 * word. If seek fails we must have a completely empty file.
1470 */
1471 nmoved = LogicalTapeBackspace(state->result_tape,
1472 2 * sizeof(unsigned int));
1473 if (nmoved == 0)
1474 return false;
1475 else if (nmoved != 2 * sizeof(unsigned int))
1476 elog(ERROR, "unexpected tape position");
1477 state->eof_reached = false;
1478 }
1479 else
1480 {
1481 /*
1482 * Back up and fetch previously-returned tuple's ending length
1483 * word. If seek fails, assume we are at start of file.
1484 */
1485 nmoved = LogicalTapeBackspace(state->result_tape,
1486 sizeof(unsigned int));
1487 if (nmoved == 0)
1488 return false;
1489 else if (nmoved != sizeof(unsigned int))
1490 elog(ERROR, "unexpected tape position");
1491 tuplen = getlen(state->result_tape, false);
1492
1493 /*
1494 * Back up to get ending length word of tuple before it.
1495 */
1496 nmoved = LogicalTapeBackspace(state->result_tape,
1497 tuplen + 2 * sizeof(unsigned int));
1498 if (nmoved == tuplen + sizeof(unsigned int))
1499 {
1500 /*
1501 * We backed up over the previous tuple, but there was no
1502 * ending length word before it. That means that the prev
1503 * tuple is the first tuple in the file. It is now the
1504 * next to read in forward direction (not obviously right,
1505 * but that is what in-memory case does).
1506 */
1507 return false;
1508 }
1509 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1510 elog(ERROR, "bogus tuple length in backward scan");
1511 }
1512
1513 tuplen = getlen(state->result_tape, false);
1514
1515 /*
1516 * Now we have the length of the prior tuple, back up and read it.
1517 * Note: READTUP expects we are positioned after the initial
1518 * length word of the tuple, so back up to that point.
1519 */
1520 nmoved = LogicalTapeBackspace(state->result_tape,
1521 tuplen);
1522 if (nmoved != tuplen)
1523 elog(ERROR, "bogus tuple length in backward scan");
1524 READTUP(state, stup, state->result_tape, tuplen);
1525
1526 /*
1527 * Remember the tuple we return, so that we can recycle its memory
1528 * on next call. (This can be NULL, in the Datum case).
1529 */
1530 state->lastReturnedTuple = stup->tuple;
1531
1532 return true;
1533
1534 case TSS_FINALMERGE:
1535 Assert(forward);
1536 /* We are managing memory ourselves, with the slab allocator. */
1537 Assert(state->slabAllocatorUsed);
1538
1539 /*
1540 * The slab slot holding the tuple that we returned in previous
1541 * gettuple call can now be reused.
1542 */
1543 if (state->lastReturnedTuple)
1544 {
1545 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1546 state->lastReturnedTuple = NULL;
1547 }
1548
1549 /*
1550 * This code should match the inner loop of mergeonerun().
1551 */
1552 if (state->memtupcount > 0)
1553 {
1554 int srcTapeIndex = state->memtuples[0].srctape;
1555 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1557
1558 *stup = state->memtuples[0];
1559
1560 /*
1561 * Remember the tuple we return, so that we can recycle its
1562 * memory on next call. (This can be NULL, in the Datum case).
1563 */
1564 state->lastReturnedTuple = stup->tuple;
1565
1566 /*
1567 * Pull next tuple from tape, and replace the returned tuple
1568 * at top of the heap with it.
1569 */
1571 {
1572 /*
1573 * If no more data, we've reached end of run on this tape.
1574 * Remove the top node from the heap.
1575 */
1577 state->nInputRuns--;
1578
1579 /*
1580 * Close the tape. It'd go away at the end of the sort
1581 * anyway, but better to release the memory early.
1582 */
1584 return true;
1585 }
1586 newtup.srctape = srcTapeIndex;
1588 return true;
1589 }
1590 return false;
1591
1592 default:
1593 elog(ERROR, "invalid tuplesort state");
1594 return false; /* keep compiler quiet */
1595 }
1596}
1597
1598
1599/*
1600 * Advance over N tuples in either forward or back direction,
1601 * without returning any data. N==0 is a no-op.
1602 * Returns true if successful, false if ran out of tuples.
1603 */
1604bool
1606{
1607 MemoryContext oldcontext;
1608
1609 /*
1610 * We don't actually support backwards skip yet, because no callers need
1611 * it. The API is designed to allow for that later, though.
1612 */
1613 Assert(forward);
1614 Assert(ntuples >= 0);
1615 Assert(!WORKER(state));
1616
1617 switch (state->status)
1618 {
1619 case TSS_SORTEDINMEM:
1620 if (state->memtupcount - state->current >= ntuples)
1621 {
1622 state->current += ntuples;
1623 return true;
1624 }
1625 state->current = state->memtupcount;
1626 state->eof_reached = true;
1627
1628 /*
1629 * Complain if caller tries to retrieve more tuples than
1630 * originally asked for in a bounded sort. This is because
1631 * returning EOF here might be the wrong thing.
1632 */
1633 if (state->bounded && state->current >= state->bound)
1634 elog(ERROR, "retrieved too many tuples in a bounded sort");
1635
1636 return false;
1637
1638 case TSS_SORTEDONTAPE:
1639 case TSS_FINALMERGE:
1640
1641 /*
1642 * We could probably optimize these cases better, but for now it's
1643 * not worth the trouble.
1644 */
1645 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1646 while (ntuples-- > 0)
1647 {
1649
1651 {
1652 MemoryContextSwitchTo(oldcontext);
1653 return false;
1654 }
1656 }
1657 MemoryContextSwitchTo(oldcontext);
1658 return true;
1659
1660 default:
1661 elog(ERROR, "invalid tuplesort state");
1662 return false; /* keep compiler quiet */
1663 }
1664}
1665
1666/*
1667 * tuplesort_merge_order - report merge order we'll use for given memory
1668 * (note: "merge order" just means the number of input tapes in the merge).
1669 *
1670 * This is exported for use by the planner. allowedMem is in bytes.
1671 */
1672int
1673tuplesort_merge_order(int64 allowedMem)
1674{
1675 int mOrder;
1676
1677 /*----------
1678 * In the merge phase, we need buffer space for each input and output tape.
1679 * Each pass in the balanced merge algorithm reads from M input tapes, and
1680 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1681 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1682 * input tape.
1683 *
1684 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1685 * N * TAPE_BUFFER_OVERHEAD
1686 *
1687 * Except for the last and next-to-last merge passes, where there can be
1688 * fewer tapes left to process, M = N. We choose M so that we have the
1689 * desired amount of memory available for the input buffers
1690 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1691 * available for the tape buffers (allowedMem).
1692 *
1693 * Note: you might be thinking we need to account for the memtuples[]
1694 * array in this calculation, but we effectively treat that as part of the
1695 * MERGE_BUFFER_SIZE workspace.
1696 *----------
1697 */
1698 mOrder = allowedMem /
1700
1701 /*
1702 * Even in minimum memory, use at least a MINORDER merge. On the other
1703 * hand, even when we have lots of memory, do not use more than a MAXORDER
1704 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1705 * additional tape reduces the amount of memory available to build runs,
1706 * which in turn can cause the same sort to need more runs, which makes
1707 * merging slower even if it can still be done in a single pass. Also,
1708 * high order merges are quite slow due to CPU cache effects; it can be
1709 * faster to pay the I/O cost of a multi-pass merge than to perform a
1710 * single merge pass across many hundreds of tapes.
1711 */
1714
1715 return mOrder;
1716}
1717
1718/*
1719 * Helper function to calculate how much memory to allocate for the read buffer
1720 * of each input tape in a merge pass.
1721 *
1722 * 'avail_mem' is the amount of memory available for the buffers of all the
1723 * tapes, both input and output.
1724 * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1725 * 'maxOutputTapes' is the max. number of output tapes we should produce.
1726 */
1727static int64
1728merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1729 int maxOutputTapes)
1730{
1731 int nOutputRuns;
1732 int nOutputTapes;
1733
1734 /*
1735 * How many output tapes will we produce in this pass?
1736 *
1737 * This is nInputRuns / nInputTapes, rounded up.
1738 */
1739 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1740
1741 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1742
1743 /*
1744 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1745 * remaining memory is divided evenly between the input tapes.
1746 *
1747 * This also follows from the formula in tuplesort_merge_order, but here
1748 * we derive the input buffer size from the amount of memory available,
1749 * and M and N.
1750 */
1751 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1752}
1753
1754/*
1755 * inittapes - initialize for tape sorting.
1756 *
1757 * This is called only if we have found we won't sort in memory.
1758 */
1759static void
1761{
1762 Assert(!LEADER(state));
1763
1764 if (mergeruns)
1765 {
1766 /* Compute number of input tapes to use when merging */
1767 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1768 }
1769 else
1770 {
1771 /* Workers can sometimes produce single run, output without merge */
1773 state->maxTapes = MINORDER;
1774 }
1775
1776 if (trace_sort)
1777 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1778 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1779
1780 /* Create the tape set */
1781 inittapestate(state, state->maxTapes);
1782 state->tapeset =
1784 state->shared ? &state->shared->fileset : NULL,
1785 state->worker);
1786
1787 state->currentRun = 0;
1788
1789 /*
1790 * Initialize logical tape arrays.
1791 */
1792 state->inputTapes = NULL;
1793 state->nInputTapes = 0;
1794 state->nInputRuns = 0;
1795
1796 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1797 state->nOutputTapes = 0;
1798 state->nOutputRuns = 0;
1799
1800 state->status = TSS_BUILDRUNS;
1801
1803}
1804
1805/*
1806 * inittapestate - initialize generic tape management state
1807 */
1808static void
1809inittapestate(Tuplesortstate *state, int maxTapes)
1810{
1812
1813 /*
1814 * Decrease availMem to reflect the space needed for tape buffers; but
1815 * don't decrease it to the point that we have no room for tuples. (That
1816 * case is only likely to occur if sorting pass-by-value Datums; in all
1817 * other scenarios the memtuples[] array is unlikely to occupy more than
1818 * half of allowedMem. In the pass-by-value case it's not important to
1819 * account for tuple space, so we don't care if LACKMEM becomes
1820 * inaccurate.)
1821 */
1822 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1823
1824 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1826
1827 /*
1828 * Make sure that the temp file(s) underlying the tape set are created in
1829 * suitable temp tablespaces. For parallel sorts, this should have been
1830 * called already, but it doesn't matter if it is called a second time.
1831 */
1833}
1834
1835/*
1836 * selectnewtape -- select next tape to output to.
1837 *
1838 * This is called after finishing a run when we know another run
1839 * must be started. This is used both when building the initial
1840 * runs, and during merge passes.
1841 */
1842static void
1844{
1845 /*
1846 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1847 * both zero. On each call, we create a new output tape to hold the next
1848 * run, until maxTapes is reached. After that, we assign new runs to the
1849 * existing tapes in a round robin fashion.
1850 */
1851 if (state->nOutputTapes < state->maxTapes)
1852 {
1853 /* Create a new tape to hold the next run */
1854 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1855 Assert(state->nOutputRuns == state->nOutputTapes);
1856 state->destTape = LogicalTapeCreate(state->tapeset);
1857 state->outputTapes[state->nOutputTapes] = state->destTape;
1858 state->nOutputTapes++;
1859 state->nOutputRuns++;
1860 }
1861 else
1862 {
1863 /*
1864 * We have reached the max number of tapes. Append to an existing
1865 * tape.
1866 */
1867 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1868 state->nOutputRuns++;
1869 }
1870}
1871
1872/*
1873 * Initialize the slab allocation arena, for the given number of slots.
1874 */
1875static void
1877{
1878 if (numSlots > 0)
1879 {
1880 char *p;
1881 int i;
1882
1883 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1884 state->slabMemoryEnd = state->slabMemoryBegin +
1886 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1888
1889 p = state->slabMemoryBegin;
1890 for (i = 0; i < numSlots - 1; i++)
1891 {
1892 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1893 p += SLAB_SLOT_SIZE;
1894 }
1895 ((SlabSlot *) p)->nextfree = NULL;
1896 }
1897 else
1898 {
1899 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1900 state->slabFreeHead = NULL;
1901 }
1902 state->slabAllocatorUsed = true;
1903}
1904
1905/*
1906 * mergeruns -- merge all the completed initial runs.
1907 *
1908 * This implements the Balanced k-Way Merge Algorithm. All input data has
1909 * already been written to initial runs on tape (see dumptuples).
1910 */
1911static void
1913{
1914 int tapenum;
1915
1916 Assert(state->status == TSS_BUILDRUNS);
1917 Assert(state->memtupcount == 0);
1918
1919 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
1920 {
1921 /*
1922 * If there are multiple runs to be merged, when we go to read back
1923 * tuples from disk, abbreviated keys will not have been stored, and
1924 * we don't care to regenerate them. Disable abbreviation from this
1925 * point on.
1926 */
1927 state->base.sortKeys->abbrev_converter = NULL;
1928 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
1929
1930 /* Not strictly necessary, but be tidy */
1931 state->base.sortKeys->abbrev_abort = NULL;
1932 state->base.sortKeys->abbrev_full_comparator = NULL;
1933 }
1934
1935 /*
1936 * Reset tuple memory. We've freed all the tuples that we previously
1937 * allocated. We will use the slab allocator from now on.
1938 */
1939 MemoryContextResetOnly(state->base.tuplecontext);
1940
1941 /*
1942 * We no longer need a large memtuples array. (We will allocate a smaller
1943 * one for the heap later.)
1944 */
1945 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1946 pfree(state->memtuples);
1947 state->memtuples = NULL;
1948
1949 /*
1950 * Initialize the slab allocator. We need one slab slot per input tape,
1951 * for the tuples in the heap, plus one to hold the tuple last returned
1952 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
1953 * however, we don't need to do allocate anything.)
1954 *
1955 * In a multi-pass merge, we could shrink this allocation for the last
1956 * merge pass, if it has fewer tapes than previous passes, but we don't
1957 * bother.
1958 *
1959 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
1960 * to track memory usage of individual tuples.
1961 */
1962 if (state->base.tuples)
1963 init_slab_allocator(state, state->nOutputTapes + 1);
1964 else
1966
1967 /*
1968 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
1969 * from each input tape.
1970 *
1971 * We could shrink this, too, between passes in a multi-pass merge, but we
1972 * don't bother. (The initial input tapes are still in outputTapes. The
1973 * number of input tapes will not increase between passes.)
1974 */
1975 state->memtupsize = state->nOutputTapes;
1976 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
1977 state->nOutputTapes * sizeof(SortTuple));
1978 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1979
1980 /*
1981 * Use all the remaining memory we have available for tape buffers among
1982 * all the input tapes. At the beginning of each merge pass, we will
1983 * divide this memory between the input and output tapes in the pass.
1984 */
1985 state->tape_buffer_mem = state->availMem;
1986 USEMEM(state, state->tape_buffer_mem);
1987 if (trace_sort)
1988 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
1989 state->worker, state->tape_buffer_mem / 1024);
1990
1991 for (;;)
1992 {
1993 /*
1994 * On the first iteration, or if we have read all the runs from the
1995 * input tapes in a multi-pass merge, it's time to start a new pass.
1996 * Rewind all the output tapes, and make them inputs for the next
1997 * pass.
1998 */
1999 if (state->nInputRuns == 0)
2000 {
2002
2003 /* Close the old, emptied, input tapes */
2004 if (state->nInputTapes > 0)
2005 {
2006 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2007 LogicalTapeClose(state->inputTapes[tapenum]);
2008 pfree(state->inputTapes);
2009 }
2010
2011 /* Previous pass's outputs become next pass's inputs. */
2012 state->inputTapes = state->outputTapes;
2013 state->nInputTapes = state->nOutputTapes;
2014 state->nInputRuns = state->nOutputRuns;
2015
2016 /*
2017 * Reset output tape variables. The actual LogicalTapes will be
2018 * created as needed, here we only allocate the array to hold
2019 * them.
2020 */
2021 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2022 state->nOutputTapes = 0;
2023 state->nOutputRuns = 0;
2024
2025 /*
2026 * Redistribute the memory allocated for tape buffers, among the
2027 * new input and output tapes.
2028 */
2030 state->nInputTapes,
2031 state->nInputRuns,
2032 state->maxTapes);
2033
2034 if (trace_sort)
2035 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2036 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2037 pg_rusage_show(&state->ru_start));
2038
2039 /* Prepare the new input tapes for merge pass. */
2040 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2042
2043 /*
2044 * If there's just one run left on each input tape, then only one
2045 * merge pass remains. If we don't have to produce a materialized
2046 * sorted tape, we can stop at this point and do the final merge
2047 * on-the-fly.
2048 */
2049 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2050 && state->nInputRuns <= state->nInputTapes
2051 && !WORKER(state))
2052 {
2053 /* Tell logtape.c we won't be writing anymore */
2055 /* Initialize for the final merge pass */
2057 state->status = TSS_FINALMERGE;
2058 return;
2059 }
2060 }
2061
2062 /* Select an output tape */
2064
2065 /* Merge one run from each input tape. */
2067
2068 /*
2069 * If the input tapes are empty, and we output only one output run,
2070 * we're done. The current output tape contains the final result.
2071 */
2072 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2073 break;
2074 }
2075
2076 /*
2077 * Done. The result is on a single run on a single tape.
2078 */
2079 state->result_tape = state->outputTapes[0];
2080 if (!WORKER(state))
2081 LogicalTapeFreeze(state->result_tape, NULL);
2082 else
2084 state->status = TSS_SORTEDONTAPE;
2085
2086 /* Close all the now-empty input tapes, to release their read buffers. */
2087 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2088 LogicalTapeClose(state->inputTapes[tapenum]);
2089}
2090
2091/*
2092 * Merge one run from each input tape.
2093 */
2094static void
2096{
2097 int srcTapeIndex;
2099
2100 /*
2101 * Start the merge by loading one tuple from each active source tape into
2102 * the heap.
2103 */
2105
2106 Assert(state->slabAllocatorUsed);
2107
2108 /*
2109 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2110 * out, and replacing it with next tuple from same tape (if there is
2111 * another one).
2112 */
2113 while (state->memtupcount > 0)
2114 {
2116
2117 /* write the tuple to destTape */
2118 srcTapeIndex = state->memtuples[0].srctape;
2119 srcTape = state->inputTapes[srcTapeIndex];
2120 WRITETUP(state, state->destTape, &state->memtuples[0]);
2121
2122 /* recycle the slot of the tuple we just wrote out, for the next read */
2123 if (state->memtuples[0].tuple)
2124 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2125
2126 /*
2127 * pull next tuple from the tape, and replace the written-out tuple in
2128 * the heap with it.
2129 */
2131 {
2132 stup.srctape = srcTapeIndex;
2134 }
2135 else
2136 {
2138 state->nInputRuns--;
2139 }
2140 }
2141
2142 /*
2143 * When the heap empties, we're done. Write an end-of-run marker on the
2144 * output tape.
2145 */
2146 markrunend(state->destTape);
2147}
2148
2149/*
2150 * beginmerge - initialize for a merge pass
2151 *
2152 * Fill the merge heap with the first tuple from each input tape.
2153 */
2154static void
2156{
2157 int activeTapes;
2158 int srcTapeIndex;
2159
2160 /* Heap should be empty here */
2161 Assert(state->memtupcount == 0);
2162
2163 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2164
2166 {
2167 SortTuple tup;
2168
2169 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2170 {
2173 }
2174 }
2175}
2176
2177/*
2178 * mergereadnext - read next tuple from one merge input tape
2179 *
2180 * Returns false on EOF.
2181 */
2182static bool
2184{
2185 unsigned int tuplen;
2186
2187 /* read next tuple, if any */
2188 if ((tuplen = getlen(srcTape, true)) == 0)
2189 return false;
2190 READTUP(state, stup, srcTape, tuplen);
2191
2192 return true;
2193}
2194
2195/*
2196 * dumptuples - remove tuples from memtuples and write initial run to tape
2197 *
2198 * When alltuples = true, dump everything currently in memory. (This case is
2199 * only used at end of input data.)
2200 */
2201static void
2203{
2204 int memtupwrite;
2205 int i;
2206
2207 /*
2208 * Nothing to do if we still fit in available memory and have array slots,
2209 * unless this is the final call during initial run generation.
2210 */
2211 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2212 !alltuples)
2213 return;
2214
2215 /*
2216 * Final call might require no sorting, in rare cases where we just so
2217 * happen to have previously LACKMEM()'d at the point where exactly all
2218 * remaining tuples are loaded into memory, just before input was
2219 * exhausted. In general, short final runs are quite possible, but avoid
2220 * creating a completely empty run. In a worker, though, we must produce
2221 * at least one tape, even if it's empty.
2222 */
2223 if (state->memtupcount == 0 && state->currentRun > 0)
2224 return;
2225
2226 Assert(state->status == TSS_BUILDRUNS);
2227
2228 /*
2229 * It seems unlikely that this limit will ever be exceeded, but take no
2230 * chances
2231 */
2232 if (state->currentRun == INT_MAX)
2233 ereport(ERROR,
2235 errmsg("cannot have more than %d runs for an external sort",
2236 INT_MAX)));
2237
2238 if (state->currentRun > 0)
2240
2241 state->currentRun++;
2242
2243 if (trace_sort)
2244 elog(LOG, "worker %d starting quicksort of run %d: %s",
2245 state->worker, state->currentRun,
2246 pg_rusage_show(&state->ru_start));
2247
2248 /*
2249 * Sort all tuples accumulated within the allowed amount of memory for
2250 * this run.
2251 */
2253
2254 if (trace_sort)
2255 elog(LOG, "worker %d finished quicksort of run %d: %s",
2256 state->worker, state->currentRun,
2257 pg_rusage_show(&state->ru_start));
2258
2259 memtupwrite = state->memtupcount;
2260 for (i = 0; i < memtupwrite; i++)
2261 {
2262 SortTuple *stup = &state->memtuples[i];
2263
2264 WRITETUP(state, state->destTape, stup);
2265 }
2266
2267 state->memtupcount = 0;
2268
2269 /*
2270 * Reset tuple memory. We've freed all of the tuples that we previously
2271 * allocated. It's important to avoid fragmentation when there is a stark
2272 * change in the sizes of incoming tuples. In bounded sorts,
2273 * fragmentation due to AllocSetFree's bucketing by size class might be
2274 * particularly bad if this step wasn't taken.
2275 */
2276 MemoryContextReset(state->base.tuplecontext);
2277
2278 /*
2279 * Now update the memory accounting to subtract the memory used by the
2280 * tuple.
2281 */
2282 FREEMEM(state, state->tupleMem);
2283 state->tupleMem = 0;
2284
2285 markrunend(state->destTape);
2286
2287 if (trace_sort)
2288 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2289 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2290 pg_rusage_show(&state->ru_start));
2291}
2292
2293/*
2294 * tuplesort_rescan - rewind and replay the scan
2295 */
2296void
2298{
2299 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2300
2301 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2302
2303 switch (state->status)
2304 {
2305 case TSS_SORTEDINMEM:
2306 state->current = 0;
2307 state->eof_reached = false;
2308 state->markpos_offset = 0;
2309 state->markpos_eof = false;
2310 break;
2311 case TSS_SORTEDONTAPE:
2312 LogicalTapeRewindForRead(state->result_tape, 0);
2313 state->eof_reached = false;
2314 state->markpos_block = 0L;
2315 state->markpos_offset = 0;
2316 state->markpos_eof = false;
2317 break;
2318 default:
2319 elog(ERROR, "invalid tuplesort state");
2320 break;
2321 }
2322
2323 MemoryContextSwitchTo(oldcontext);
2324}
2325
2326/*
2327 * tuplesort_markpos - saves current position in the merged sort file
2328 */
2329void
2331{
2332 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2333
2334 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2335
2336 switch (state->status)
2337 {
2338 case TSS_SORTEDINMEM:
2339 state->markpos_offset = state->current;
2340 state->markpos_eof = state->eof_reached;
2341 break;
2342 case TSS_SORTEDONTAPE:
2343 LogicalTapeTell(state->result_tape,
2344 &state->markpos_block,
2345 &state->markpos_offset);
2346 state->markpos_eof = state->eof_reached;
2347 break;
2348 default:
2349 elog(ERROR, "invalid tuplesort state");
2350 break;
2351 }
2352
2353 MemoryContextSwitchTo(oldcontext);
2354}
2355
2356/*
2357 * tuplesort_restorepos - restores current position in merged sort file to
2358 * last saved position
2359 */
2360void
2362{
2363 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2364
2365 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2366
2367 switch (state->status)
2368 {
2369 case TSS_SORTEDINMEM:
2370 state->current = state->markpos_offset;
2371 state->eof_reached = state->markpos_eof;
2372 break;
2373 case TSS_SORTEDONTAPE:
2374 LogicalTapeSeek(state->result_tape,
2375 state->markpos_block,
2376 state->markpos_offset);
2377 state->eof_reached = state->markpos_eof;
2378 break;
2379 default:
2380 elog(ERROR, "invalid tuplesort state");
2381 break;
2382 }
2383
2384 MemoryContextSwitchTo(oldcontext);
2385}
2386
2387/*
2388 * tuplesort_get_stats - extract summary statistics
2389 *
2390 * This can be called after tuplesort_performsort() finishes to obtain
2391 * printable summary information about how the sort was performed.
2392 */
2393void
2396{
2397 /*
2398 * Note: it might seem we should provide both memory and disk usage for a
2399 * disk-based sort. However, the current code doesn't track memory space
2400 * accurately once we have begun to return tuples to the caller (since we
2401 * don't account for pfree's the caller is expected to do), so we cannot
2402 * rely on availMem in a disk sort. This does not seem worth the overhead
2403 * to fix. Is it worth creating an API for the memory context code to
2404 * tell us how much is actually used in sortcontext?
2405 */
2407
2408 if (state->isMaxSpaceDisk)
2410 else
2412 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2413
2414 switch (state->maxSpaceStatus)
2415 {
2416 case TSS_SORTEDINMEM:
2417 if (state->boundUsed)
2419 else
2421 break;
2422 case TSS_SORTEDONTAPE:
2424 break;
2425 case TSS_FINALMERGE:
2427 break;
2428 default:
2430 break;
2431 }
2432}
2433
2434/*
2435 * Convert TuplesortMethod to a string.
2436 */
2437const char *
2439{
2440 switch (m)
2441 {
2443 return "still in progress";
2445 return "top-N heapsort";
2447 return "quicksort";
2449 return "external sort";
2451 return "external merge";
2452 }
2453
2454 return "unknown";
2455}
2456
2457/*
2458 * Convert TuplesortSpaceType to a string.
2459 */
2460const char *
2462{
2464 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2465}
2466
2467
2468/*
2469 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2470 */
2471
2472/*
2473 * Convert the existing unordered array of SortTuples to a bounded heap,
2474 * discarding all but the smallest "state->bound" tuples.
2475 *
2476 * When working with a bounded heap, we want to keep the largest entry
2477 * at the root (array entry zero), instead of the smallest as in the normal
2478 * sort case. This allows us to discard the largest entry cheaply.
2479 * Therefore, we temporarily reverse the sort direction.
2480 */
2481static void
2483{
2484 int tupcount = state->memtupcount;
2485 int i;
2486
2487 Assert(state->status == TSS_INITIAL);
2488 Assert(state->bounded);
2489 Assert(tupcount >= state->bound);
2491
2492 /* Reverse sort direction so largest entry will be at root */
2494
2495 state->memtupcount = 0; /* make the heap empty */
2496 for (i = 0; i < tupcount; i++)
2497 {
2498 if (state->memtupcount < state->bound)
2499 {
2500 /* Insert next tuple into heap */
2501 /* Must copy source tuple to avoid possible overwrite */
2502 SortTuple stup = state->memtuples[i];
2503
2505 }
2506 else
2507 {
2508 /*
2509 * The heap is full. Replace the largest entry with the new
2510 * tuple, or just discard it, if it's larger than anything already
2511 * in the heap.
2512 */
2513 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2514 {
2515 free_sort_tuple(state, &state->memtuples[i]);
2517 }
2518 else
2519 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2520 }
2521 }
2522
2523 Assert(state->memtupcount == state->bound);
2524 state->status = TSS_BOUNDED;
2525}
2526
2527/*
2528 * Convert the bounded heap to a properly-sorted array
2529 */
2530static void
2532{
2533 int tupcount = state->memtupcount;
2534
2535 Assert(state->status == TSS_BOUNDED);
2536 Assert(state->bounded);
2537 Assert(tupcount == state->bound);
2539
2540 /*
2541 * We can unheapify in place because each delete-top call will remove the
2542 * largest entry, which we can promptly store in the newly freed slot at
2543 * the end. Once we're down to a single-entry heap, we're done.
2544 */
2545 while (state->memtupcount > 1)
2546 {
2547 SortTuple stup = state->memtuples[0];
2548
2549 /* this sifts-up the next-largest entry and decreases memtupcount */
2551 state->memtuples[state->memtupcount] = stup;
2552 }
2553 state->memtupcount = tupcount;
2554
2555 /*
2556 * Reverse sort direction back to the original state. This is not
2557 * actually necessary but seems like a good idea for tidiness.
2558 */
2560
2561 state->status = TSS_SORTEDINMEM;
2562 state->boundUsed = true;
2563}
2564
2565
2566/* radix sort routines */
2567
2568/*
2569 * Retrieve byte from datum, indexed by 'level': 0 for MSB, 7 for LSB
2570 */
2571static inline uint8
2572current_byte(Datum key, int level)
2573{
2574 int shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
2575
2576 return (key >> shift) & 0xFF;
2577}
2578
2579/*
2580 * Normalize datum such that unsigned comparison is order-preserving,
2581 * taking ASC/DESC into account as well.
2582 */
2583static inline Datum
2585{
2587
2588 if (ssup->comparator == ssup_datum_signed_cmp)
2589 {
2590 norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
2591 }
2592 else if (ssup->comparator == ssup_datum_int32_cmp)
2593 {
2594 /*
2595 * First truncate to uint32. Technically, we don't need to do this,
2596 * but it forces the upper half of the datum to be zero regardless of
2597 * sign.
2598 */
2600
2602 }
2603 else
2604 {
2606 norm_datum1 = orig;
2607 }
2608
2609 if (ssup->ssup_reverse)
2611
2612 return norm_datum1;
2613}
2614
2615/*
2616 * radix_sort_recursive
2617 *
2618 * Radix sort by (pass-by-value) datum1, diverting to qsort_tuple()
2619 * for tiebreaks.
2620 *
2621 * This is a modification of
2622 * ska_byte_sort() from https://github.com/skarupke/ska_sort
2623 * The original copyright notice follows:
2624 *
2625 * Copyright Malte Skarupke 2016.
2626 * Distributed under the Boost Software License, Version 1.0.
2627 *
2628 * Boost Software License - Version 1.0 - August 17th, 2003
2629 *
2630 * Permission is hereby granted, free of charge, to any person or organization
2631 * obtaining a copy of the software and accompanying documentation covered by
2632 * this license (the "Software") to use, reproduce, display, distribute,
2633 * execute, and transmit the Software, and to prepare derivative works of the
2634 * Software, and to permit third-parties to whom the Software is furnished to
2635 * do so, all subject to the following:
2636 *
2637 * The copyright notices in the Software and this entire statement, including
2638 * the above license grant, this restriction and the following disclaimer,
2639 * must be included in all copies of the Software, in whole or in part, and
2640 * all derivative works of the Software, unless such copies or derivative
2641 * works are solely in the form of machine-executable object code generated by
2642 * a source language processor.
2643 *
2644 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2645 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
2646 * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
2647 * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
2648 * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
2649 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2650 * DEALINGS IN THE SOFTWARE.
2651 */
2652static void
2653radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
2654{
2655 RadixSortInfo partitions[256] = {0};
2657 size_t total = 0;
2658 int num_partitions = 0;
2659 int num_remaining;
2660 SortSupport ssup = &state->base.sortKeys[0];
2661 size_t start_offset = 0;
2662 SortTuple *partition_begin = begin;
2663
2664 /* count number of occurrences of each byte */
2665 for (SortTuple *st = begin; st < begin + n_elems; st++)
2666 {
2668
2669 /* extract the byte for this level from the normalized datum */
2670 this_partition = current_byte(normalize_datum(st->datum1, ssup),
2671 level);
2672
2673 /* save it for the permutation step */
2674 st->curbyte = this_partition;
2675
2676 partitions[this_partition].count++;
2677
2679 }
2680
2681 /* compute partition offsets */
2682 for (int i = 0; i < 256; i++)
2683 {
2684 size_t count = partitions[i].count;
2685
2686 if (count != 0)
2687 {
2688 partitions[i].offset = total;
2689 total += count;
2690 remaining_partitions[num_partitions] = i;
2691 num_partitions++;
2692 }
2693 partitions[i].next_offset = total;
2694 }
2695
2696 /*
2697 * Swap tuples to correct partition.
2698 *
2699 * In traditional American flag sort, a swap sends the current element to
2700 * the correct partition, but the array pointer only advances if the
2701 * partner of the swap happens to be an element that belongs in the
2702 * current partition. That only requires one pass through the array, but
2703 * the disadvantage is we don't know if the pointer can advance until the
2704 * swap completes. Here lies the most interesting innovation from the
2705 * upstream ska_byte_sort: After initiating the swap, we immediately
2706 * proceed to the next element. This makes better use of CPU pipelining,
2707 * but also means that we will often need multiple iterations of this
2708 * loop. ska_byte_sort() maintains a separate list of which partitions
2709 * haven't finished, which is updated every loop iteration. Here we simply
2710 * check each partition during every iteration.
2711 *
2712 * If we started with a single partition, there is nothing to do. If a
2713 * previous loop iteration results in only one partition that hasn't been
2714 * counted as sorted, we know it's actually sorted and can exit the loop.
2715 */
2716 num_remaining = num_partitions;
2717 while (num_remaining > 1)
2718 {
2719 /* start the count over */
2720 num_remaining = num_partitions;
2721
2722 for (int i = 0; i < num_partitions; i++)
2723 {
2725
2726 for (SortTuple *st = begin + partitions[idx].offset;
2727 st < begin + partitions[idx].next_offset;
2728 st++)
2729 {
2730 size_t offset = partitions[st->curbyte].offset++;
2731 SortTuple tmp;
2732
2733 /* swap current tuple with destination position */
2734 Assert(offset < n_elems);
2735 tmp = *st;
2736 *st = begin[offset];
2737 begin[offset] = tmp;
2738
2740 };
2741
2742 /* Is this partition sorted? */
2743 if (partitions[idx].offset == partitions[idx].next_offset)
2744 num_remaining--;
2745 }
2746 }
2747
2748 /* recurse */
2750 rp < remaining_partitions + num_partitions;
2751 rp++)
2752 {
2753 size_t end_offset = partitions[*rp].next_offset;
2756
2757 if (num_elements > 1)
2758 {
2759 if (level < sizeof(Datum) - 1)
2760 {
2762 {
2765 state->base.comparetup,
2766 state);
2767 }
2768 else
2769 {
2772 level + 1,
2773 state);
2774 }
2775 }
2776 else if (state->base.onlyKey == NULL)
2777 {
2778 /*
2779 * We've finished radix sort on all bytes of the pass-by-value
2780 * datum (possibly abbreviated), now sort using the tiebreak
2781 * comparator.
2782 */
2785 state->base.comparetup_tiebreak,
2786 state);
2787 }
2788 }
2789
2792 }
2793}
2794
2795/*
2796 * Entry point for radix_sort_recursive
2797 *
2798 * Partition tuples by isnull1, then sort both partitions, using
2799 * radix sort on the NOT NULL partition if it's large enough.
2800 */
2801static void
2803{
2804 bool nulls_first = state->base.sortKeys[0].ssup_nulls_first;
2807 size_t d1 = 0,
2808 d2,
2809 null_count,
2811
2812 /*
2813 * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
2814 * This also serves as a quick check for the common case where all tuples
2815 * are NOT NULL in the first sort key.
2816 */
2817 while (d1 < n && data[d1].isnull1 == nulls_first)
2818 {
2819 d1++;
2821 }
2822
2823 /*
2824 * If we have more than one tuple left after the quick check, partition
2825 * the remainder using branchless cyclic permutation, based on
2826 * https://orlp.net/blog/branchless-lomuto-partitioning/
2827 */
2828 Assert(n > 0);
2829 if (d1 < n - 1)
2830 {
2831 size_t i = d1,
2832 j = d1;
2833 SortTuple tmp = data[d1]; /* create gap at front */
2834
2835 while (j < n - 1)
2836 {
2837 /* gap is at j, move i's element to gap */
2838 data[j] = data[i];
2839 /* advance j to the first unknown element */
2840 j += 1;
2841 /* move the first unknown element back to i */
2842 data[i] = data[j];
2843 /* advance i if this element belongs in the left partition */
2844 i += (data[i].isnull1 == nulls_first);
2845
2847 }
2848
2849 /* place gap between left and right partitions */
2850 data[j] = data[i];
2851 /* restore the saved element */
2852 data[i] = tmp;
2853 /* assign it to the correct partition */
2854 i += (data[i].isnull1 == nulls_first);
2855
2856 /* d1 is now the number of elements in the left partition */
2857 d1 = i;
2858 }
2859
2860 d2 = n - d1;
2861
2862 /* set pointers and counts for each partition */
2863 if (nulls_first)
2864 {
2865 null_start = data;
2866 null_count = d1;
2867 not_null_start = data + d1;
2868 not_null_count = d2;
2869 }
2870 else
2871 {
2873 not_null_count = d1;
2874 null_start = data + d1;
2875 null_count = d2;
2876 }
2877
2878 for (SortTuple *st = null_start;
2879 st < null_start + null_count;
2880 st++)
2881 Assert(st->isnull1 == true);
2882 for (SortTuple *st = not_null_start;
2884 st++)
2885 Assert(st->isnull1 == false);
2886
2887 /*
2888 * Sort the NULL partition using tiebreak comparator, if necessary.
2889 */
2890 if (state->base.onlyKey == NULL && null_count > 1)
2891 {
2893 null_count,
2894 state->base.comparetup_tiebreak,
2895 state);
2896 }
2897
2898 /*
2899 * Sort the NOT NULL partition, using radix sort if large enough,
2900 * otherwise fall back to quicksort.
2901 */
2903 {
2906 state->base.comparetup,
2907 state);
2908 }
2909 else
2910 {
2911 bool presorted = true;
2912
2913 for (SortTuple *st = not_null_start + 1;
2915 st++)
2916 {
2917 if (COMPARETUP(state, st - 1, st) > 0)
2918 {
2919 presorted = false;
2920 break;
2921 }
2922
2924 }
2925
2926 if (presorted)
2927 return;
2928 else
2929 {
2932 0,
2933 state);
2934 }
2935 }
2936}
2937
2938/* Verify in-memory sort using standard comparator. */
2939static void
2941{
2942#ifdef USE_ASSERT_CHECKING
2943 for (SortTuple *st = state->memtuples + 1;
2944 st < state->memtuples + state->memtupcount;
2945 st++)
2946 Assert(COMPARETUP(state, st - 1, st) <= 0);
2947#endif
2948}
2949
2950/*
2951 * Sort all memtuples using specialized routines.
2952 *
2953 * Quicksort or radix sort is used for small in-memory sorts,
2954 * and external sort runs.
2955 */
2956static void
2958{
2959 Assert(!LEADER(state));
2960
2961 if (state->memtupcount > 1)
2962 {
2963 /*
2964 * Do we have the leading column's value or abbreviation in datum1?
2965 */
2966 if (state->base.haveDatum1 && state->base.sortKeys)
2967 {
2968 SortSupport ssup = &state->base.sortKeys[0];
2969
2970 /* Does it compare as an integer? */
2971 if (state->memtupcount >= QSORT_THRESHOLD &&
2975 {
2976 radix_sort_tuple(state->memtuples,
2977 state->memtupcount,
2978 state);
2980 return;
2981 }
2982 }
2983
2984 /* Can we use the single-key sort function? */
2985 if (state->base.onlyKey != NULL)
2986 {
2987 qsort_ssup(state->memtuples, state->memtupcount,
2988 state->base.onlyKey);
2989 }
2990 else
2991 {
2992 qsort_tuple(state->memtuples,
2993 state->memtupcount,
2994 state->base.comparetup,
2995 state);
2996 }
2997 }
2998}
2999
3000/*
3001 * Insert a new tuple into an empty or existing heap, maintaining the
3002 * heap invariant. Caller is responsible for ensuring there's room.
3003 *
3004 * Note: For some callers, tuple points to a memtuples[] entry above the
3005 * end of the heap. This is safe as long as it's not immediately adjacent
3006 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3007 * is, it might get overwritten before being moved into the heap!
3008 */
3009static void
3011{
3012 SortTuple *memtuples;
3013 int j;
3014
3015 memtuples = state->memtuples;
3016 Assert(state->memtupcount < state->memtupsize);
3017
3019
3020 /*
3021 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3022 * using 1-based array indexes, not 0-based.
3023 */
3024 j = state->memtupcount++;
3025 while (j > 0)
3026 {
3027 int i = (j - 1) >> 1;
3028
3029 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3030 break;
3031 memtuples[j] = memtuples[i];
3032 j = i;
3033 }
3034 memtuples[j] = *tuple;
3035}
3036
3037/*
3038 * Remove the tuple at state->memtuples[0] from the heap. Decrement
3039 * memtupcount, and sift up to maintain the heap invariant.
3040 *
3041 * The caller has already free'd the tuple the top node points to,
3042 * if necessary.
3043 */
3044static void
3046{
3047 SortTuple *memtuples = state->memtuples;
3048 SortTuple *tuple;
3049
3050 if (--state->memtupcount <= 0)
3051 return;
3052
3053 /*
3054 * Remove the last tuple in the heap, and re-insert it, by replacing the
3055 * current top node with it.
3056 */
3057 tuple = &memtuples[state->memtupcount];
3059}
3060
3061/*
3062 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3063 * maintain the heap invariant.
3064 *
3065 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3066 * Heapsort, steps H3-H8).
3067 */
3068static void
3070{
3071 SortTuple *memtuples = state->memtuples;
3072 unsigned int i,
3073 n;
3074
3075 Assert(state->memtupcount >= 1);
3076
3078
3079 /*
3080 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3081 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3082 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3083 */
3084 n = state->memtupcount;
3085 i = 0; /* i is where the "hole" is */
3086 for (;;)
3087 {
3088 unsigned int j = 2 * i + 1;
3089
3090 if (j >= n)
3091 break;
3092 if (j + 1 < n &&
3093 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3094 j++;
3095 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3096 break;
3097 memtuples[i] = memtuples[j];
3098 i = j;
3099 }
3100 memtuples[i] = *tuple;
3101}
3102
3103/*
3104 * Function to reverse the sort direction from its current state
3105 *
3106 * It is not safe to call this when performing hash tuplesorts
3107 */
3108static void
3110{
3111 SortSupport sortKey = state->base.sortKeys;
3112 int nkey;
3113
3114 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
3115 {
3116 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3117 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3118 }
3119}
3120
3121
3122/*
3123 * Tape interface routines
3124 */
3125
3126static unsigned int
3128{
3129 unsigned int len;
3130
3132 &len, sizeof(len)) != sizeof(len))
3133 elog(ERROR, "unexpected end of tape");
3134 if (len == 0 && !eofOK)
3135 elog(ERROR, "unexpected end of data");
3136 return len;
3137}
3138
3139static void
3141{
3142 unsigned int len = 0;
3143
3144 LogicalTapeWrite(tape, &len, sizeof(len));
3145}
3146
3147/*
3148 * Get memory for tuple from within READTUP() routine.
3149 *
3150 * We use next free slot from the slab allocator, or palloc() if the tuple
3151 * is too large for that.
3152 */
3153void *
3155{
3156 SlabSlot *buf;
3157
3158 /*
3159 * We pre-allocate enough slots in the slab arena that we should never run
3160 * out.
3161 */
3162 Assert(state->slabFreeHead);
3163
3164 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3165 return MemoryContextAlloc(state->base.sortcontext, tuplen);
3166 else
3167 {
3168 buf = state->slabFreeHead;
3169 /* Reuse this slot */
3170 state->slabFreeHead = buf->nextfree;
3171
3172 return buf;
3173 }
3174}
3175
3176
3177/*
3178 * Parallel sort routines
3179 */
3180
3181/*
3182 * tuplesort_estimate_shared - estimate required shared memory allocation
3183 *
3184 * nWorkers is an estimate of the number of workers (it's the number that
3185 * will be requested).
3186 */
3187Size
3189{
3191
3192 Assert(nWorkers > 0);
3193
3194 /* Make sure that BufFile shared state is MAXALIGN'd */
3197
3198 return tapesSize;
3199}
3200
3201/*
3202 * tuplesort_initialize_shared - initialize shared tuplesort state
3203 *
3204 * Must be called from leader process before workers are launched, to
3205 * establish state needed up-front for worker tuplesortstates. nWorkers
3206 * should match the argument passed to tuplesort_estimate_shared().
3207 */
3208void
3210{
3211 int i;
3212
3213 Assert(nWorkers > 0);
3214
3215 SpinLockInit(&shared->mutex);
3216 shared->currentWorker = 0;
3217 shared->workersFinished = 0;
3218 SharedFileSetInit(&shared->fileset, seg);
3219 shared->nTapes = nWorkers;
3220 for (i = 0; i < nWorkers; i++)
3221 {
3222 shared->tapes[i].firstblocknumber = 0L;
3223 }
3224}
3225
3226/*
3227 * tuplesort_attach_shared - attach to shared tuplesort state
3228 *
3229 * Must be called by all worker processes.
3230 */
3231void
3233{
3234 /* Attach to SharedFileSet */
3235 SharedFileSetAttach(&shared->fileset, seg);
3236}
3237
3238/*
3239 * worker_get_identifier - Assign and return ordinal identifier for worker
3240 *
3241 * The order in which these are assigned is not well defined, and should not
3242 * matter; worker numbers across parallel sort participants need only be
3243 * distinct and gapless. logtape.c requires this.
3244 *
3245 * Note that the identifiers assigned from here have no relation to
3246 * ParallelWorkerNumber number, to avoid making any assumption about
3247 * caller's requirements. However, we do follow the ParallelWorkerNumber
3248 * convention of representing a non-worker with worker number -1. This
3249 * includes the leader, as well as serial Tuplesort processes.
3250 */
3251static int
3253{
3254 Sharedsort *shared = state->shared;
3255 int worker;
3256
3258
3259 SpinLockAcquire(&shared->mutex);
3260 worker = shared->currentWorker++;
3261 SpinLockRelease(&shared->mutex);
3262
3263 return worker;
3264}
3265
3266/*
3267 * worker_freeze_result_tape - freeze worker's result tape for leader
3268 *
3269 * This is called by workers just after the result tape has been determined,
3270 * instead of calling LogicalTapeFreeze() directly. They do so because
3271 * workers require a few additional steps over similar serial
3272 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3273 * steps are around freeing now unneeded resources, and representing to
3274 * leader that worker's input run is available for its merge.
3275 *
3276 * There should only be one final output run for each worker, which consists
3277 * of all tuples that were originally input into worker.
3278 */
3279static void
3281{
3282 Sharedsort *shared = state->shared;
3284
3286 Assert(state->result_tape != NULL);
3287 Assert(state->memtupcount == 0);
3288
3289 /*
3290 * Free most remaining memory, in case caller is sensitive to our holding
3291 * on to it. memtuples may not be a tiny merge heap at this point.
3292 */
3293 pfree(state->memtuples);
3294 /* Be tidy */
3295 state->memtuples = NULL;
3296 state->memtupsize = 0;
3297
3298 /*
3299 * Parallel worker requires result tape metadata, which is to be stored in
3300 * shared memory for leader
3301 */
3302 LogicalTapeFreeze(state->result_tape, &output);
3303
3304 /* Store properties of output tape, and update finished worker count */
3305 SpinLockAcquire(&shared->mutex);
3306 shared->tapes[state->worker] = output;
3307 shared->workersFinished++;
3308 SpinLockRelease(&shared->mutex);
3309}
3310
3311/*
3312 * worker_nomergeruns - dump memtuples in worker, without merging
3313 *
3314 * This called as an alternative to mergeruns() with a worker when no
3315 * merging is required.
3316 */
3317static void
3319{
3321 Assert(state->result_tape == NULL);
3322 Assert(state->nOutputRuns == 1);
3323
3324 state->result_tape = state->destTape;
3326}
3327
3328/*
3329 * leader_takeover_tapes - create tapeset for leader from worker tapes
3330 *
3331 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3332 * sorting has occurred in workers, all of which must have already returned
3333 * from tuplesort_performsort().
3334 *
3335 * When this returns, leader process is left in a state that is virtually
3336 * indistinguishable from it having generated runs as a serial external sort
3337 * might have.
3338 */
3339static void
3341{
3342 Sharedsort *shared = state->shared;
3343 int nParticipants = state->nParticipants;
3344 int workersFinished;
3345 int j;
3346
3348 Assert(nParticipants >= 1);
3349
3350 SpinLockAcquire(&shared->mutex);
3351 workersFinished = shared->workersFinished;
3352 SpinLockRelease(&shared->mutex);
3353
3354 if (nParticipants != workersFinished)
3355 elog(ERROR, "cannot take over tapes before all workers finish");
3356
3357 /*
3358 * Create the tapeset from worker tapes, including a leader-owned tape at
3359 * the end. Parallel workers are far more expensive than logical tapes,
3360 * so the number of tapes allocated here should never be excessive.
3361 */
3362 inittapestate(state, nParticipants);
3363 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3364
3365 /*
3366 * Set currentRun to reflect the number of runs we will merge (it's not
3367 * used for anything, this is just pro forma)
3368 */
3369 state->currentRun = nParticipants;
3370
3371 /*
3372 * Initialize the state to look the same as after building the initial
3373 * runs.
3374 *
3375 * There will always be exactly 1 run per worker, and exactly one input
3376 * tape per run, because workers always output exactly 1 run, even when
3377 * there were no input tuples for workers to sort.
3378 */
3379 state->inputTapes = NULL;
3380 state->nInputTapes = 0;
3381 state->nInputRuns = 0;
3382
3383 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3384 state->nOutputTapes = nParticipants;
3385 state->nOutputRuns = nParticipants;
3386
3387 for (j = 0; j < nParticipants; j++)
3388 {
3389 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3390 }
3391
3392 state->status = TSS_BUILDRUNS;
3393}
3394
3395/*
3396 * Convenience routine to free a tuple previously loaded into sort memory
3397 */
3398static void
3400{
3401 if (stup->tuple)
3402 {
3404 pfree(stup->tuple);
3405 stup->tuple = NULL;
3406 }
3407}
3408
3409int
3411{
3412 if (x < y)
3413 return -1;
3414 else if (x > y)
3415 return 1;
3416 else
3417 return 0;
3418}
3419
3420int
3422{
3425
3426 if (xx < yy)
3427 return -1;
3428 else if (xx > yy)
3429 return 1;
3430 else
3431 return 0;
3432}
3433
3434int
3436{
3439
3440 if (xx < yy)
3441 return -1;
3442 else if (xx > yy)
3443 return 1;
3444 else
3445 return 0;
3446}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
void PrepareTempTablespaces(void)
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition bump.c:133
#define PG_INT32_MAX
Definition c.h:615
#define Min(x, y)
Definition c.h:1019
#define MAXALIGN(LEN)
Definition c.h:838
uint8_t uint8
Definition c.h:556
#define INT64_FORMAT
Definition c.h:576
#define Assert(condition)
Definition c.h:885
int64_t int64
Definition c.h:555
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:492
int32_t int32
Definition c.h:554
#define PG_INT64_MAX
Definition c.h:618
uint64_t uint64
Definition c.h:559
uint32_t uint32
Definition c.h:558
size_t Size
Definition c.h:631
int errcode(int sqlerrcode)
Definition elog.c:864
int errmsg(const char *fmt,...)
Definition elog.c:1081
#define LOG
Definition elog.h:31
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define palloc0_object(type)
Definition fe_memutils.h:75
FILE * output
TuplesortSpaceType
@ SORT_SPACE_TYPE_DISK
@ SORT_SPACE_TYPE_MEMORY
TuplesortMethod
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_QUICKSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
int y
Definition isn.c:76
int x
Definition isn.c:75
int j
Definition isn.c:78
int i
Definition isn.c:77
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition logtape.c:1133
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition logtape.c:556
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition logtape.c:761
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition logtape.c:680
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition logtape.c:609
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
Size GetMemoryChunkSpace(void *pointer)
Definition mcxt.c:770
void * palloc0(Size size)
Definition mcxt.c:1417
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void * repalloc_huge(void *pointer, Size size)
Definition mcxt.c:1757
void MemoryContextResetOnly(MemoryContext context)
Definition mcxt.c:422
#define AllocSetContextCreate
Definition memutils.h:129
#define MaxAllocHugeSize
Definition memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define BITS_PER_BYTE
const void size_t len
const void * data
const char * pg_rusage_show(const PGRUsage *ru0)
Definition pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition pg_rusage.c:27
static char buf[DEFAULT_XLOG_SEG_SIZE]
static int partitions
Definition pgbench.c:224
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:232
static int64 DatumGetInt64(Datum X)
Definition postgres.h:413
uint64_t Datum
Definition postgres.h:70
static Datum UInt32GetDatum(uint32 X)
Definition postgres.h:242
static int32 DatumGetInt32(Datum X)
Definition postgres.h:212
static int fb(int x)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
#define SpinLockInit(lock)
Definition spin.h:53
#define SpinLockRelease(lock)
Definition spin.h:57
#define SpinLockAcquire(lock)
Definition spin.h:55
size_t next_offset
Definition tuplesort.c:517
size_t offset
Definition tuplesort.c:515
SharedFileSet fileset
Definition tuplesort.c:357
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition tuplesort.c:366
int workersFinished
Definition tuplesort.c:354
slock_t mutex
Definition tuplesort.c:343
int currentWorker
Definition tuplesort.c:353
int(* comparator)(Datum x, Datum y, SortSupport ssup)
int srctape
Definition tuplesort.h:120
Datum datum1
Definition tuplesort.h:117
int64 firstblocknumber
Definition logtape.h:54
TuplesortSpaceType spaceType
void * lastReturnedTuple
Definition tuplesort.c:261
LogicalTapeSet * tapeset
Definition tuplesort.c:205
bool isMaxSpaceDisk
Definition tuplesort.c:202
SortTuple * memtuples
Definition tuplesort.c:214
LogicalTape ** inputTapes
Definition tuplesort.c:277
bool slabAllocatorUsed
Definition tuplesort.c:246
TuplesortPublic base
Definition tuplesort.c:185
char * slabMemoryEnd
Definition tuplesort.c:249
PGRUsage ru_start
Definition tuplesort.c:333
char * slabMemoryBegin
Definition tuplesort.c:248
LogicalTape ** outputTapes
Definition tuplesort.c:281
size_t tape_buffer_mem
Definition tuplesort.c:253
TupSortStatus status
Definition tuplesort.c:186
LogicalTape * destTape
Definition tuplesort.c:285
TupSortStatus maxSpaceStatus
Definition tuplesort.c:204
int64 markpos_block
Definition tuplesort.c:297
Sharedsort * shared
Definition tuplesort.c:318
LogicalTape * result_tape
Definition tuplesort.c:292
SlabSlot * slabFreeHead
Definition tuplesort.c:250
void tuplesort_rescan(Tuplesortstate *state)
Definition tuplesort.c:2298
void tuplesort_performsort(Tuplesortstate *state)
Definition tuplesort.c:1259
int tuplesort_merge_order(int64 allowedMem)
Definition tuplesort.c:1674
#define TAPE_BUFFER_OVERHEAD
Definition tuplesort.c:176
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition tuplesort.c:3046
#define INITIAL_MEMTUPSIZE
Definition tuplesort.c:118
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition tuplesort.c:3128
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition tuplesort.c:3210
#define COMPARETUP(state, a, b)
Definition tuplesort.c:393
static Datum normalize_datum(Datum orig, SortSupport ssup)
Definition tuplesort.c:2585
static void selectnewtape(Tuplesortstate *state)
Definition tuplesort.c:1844
void tuplesort_reset(Tuplesortstate *state)
Definition tuplesort.c:915
#define SERIAL(state)
Definition tuplesort.c:400
#define FREESTATE(state)
Definition tuplesort.c:396
static void markrunend(LogicalTape *tape)
Definition tuplesort.c:3141
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition tuplesort.c:1606
static uint8 current_byte(Datum key, int level)
Definition tuplesort.c:2573
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition tuplesort.c:3400
#define REMOVEABBREV(state, stup, count)
Definition tuplesort.c:392
#define LACKMEM(state)
Definition tuplesort.c:397
static void reversedirection(Tuplesortstate *state)
Definition tuplesort.c:3110
#define USEMEM(state, amt)
Definition tuplesort.c:398
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition tuplesort.c:3011
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3422
static bool grow_memtuples(Tuplesortstate *state)
Definition tuplesort.c:948
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3411
static void beginmerge(Tuplesortstate *state)
Definition tuplesort.c:2156
static void make_bounded_heap(Tuplesortstate *state)
Definition tuplesort.c:2483
bool tuplesort_used_bound(Tuplesortstate *state)
Definition tuplesort.c:782
#define WRITETUP(state, tape, stup)
Definition tuplesort.c:394
#define QSORT_THRESHOLD
Definition tuplesort.c:523
static void sort_bounded_heap(Tuplesortstate *state)
Definition tuplesort.c:2532
TupSortStatus
Definition tuplesort.c:153
@ TSS_SORTEDONTAPE
Definition tuplesort.c:158
@ TSS_SORTEDINMEM
Definition tuplesort.c:157
@ TSS_INITIAL
Definition tuplesort.c:154
@ TSS_FINALMERGE
Definition tuplesort.c:159
@ TSS_BUILDRUNS
Definition tuplesort.c:156
@ TSS_BOUNDED
Definition tuplesort.c:155
static int worker_get_identifier(Tuplesortstate *state)
Definition tuplesort.c:3253
static void mergeonerun(Tuplesortstate *state)
Definition tuplesort.c:2096
#define FREEMEM(state, amt)
Definition tuplesort.c:399
static void radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
Definition tuplesort.c:2654
static void verify_memtuples_sorted(Tuplesortstate *state)
Definition tuplesort.c:2941
#define MAXORDER
Definition tuplesort.c:175
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition tuplesort.c:1810
#define SLAB_SLOT_SIZE
Definition tuplesort.c:140
static void leader_takeover_tapes(Tuplesortstate *state)
Definition tuplesort.c:3341
Size tuplesort_estimate_shared(int nWorkers)
Definition tuplesort.c:3189
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition tuplesort.c:2395
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition tuplesort.c:546
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition tuplesort.c:2958
void tuplesort_end(Tuplesortstate *state)
Definition tuplesort.c:847
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition tuplesort.c:1761
void tuplesort_markpos(Tuplesortstate *state)
Definition tuplesort.c:2331
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition tuplesort.c:1065
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition tuplesort.c:2462
static void radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
Definition tuplesort.c:2803
#define MERGE_BUFFER_SIZE
Definition tuplesort.c:177
#define READTUP(state, stup, tape, len)
Definition tuplesort.c:395
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3436
#define LEADER(state)
Definition tuplesort.c:402
#define WORKER(state)
Definition tuplesort.c:401
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition tuplesort.c:1366
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition tuplesort.c:1729
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition tuplesort.c:2184
static void tuplesort_updatemax(Tuplesortstate *state)
Definition tuplesort.c:864
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition tuplesort.c:3281
bool trace_sort
Definition tuplesort.c:122
#define RELEASE_SLAB_SLOT(state, tuple)
Definition tuplesort.c:380
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition tuplesort.c:3233
static void worker_nomergeruns(Tuplesortstate *state)
Definition tuplesort.c:3319
const char * tuplesort_method_name(TuplesortMethod m)
Definition tuplesort.c:2439
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition tuplesort.c:3070
void tuplesort_restorepos(Tuplesortstate *state)
Definition tuplesort.c:2362
static void mergeruns(Tuplesortstate *state)
Definition tuplesort.c:1913
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition tuplesort.c:3155
#define MINORDER
Definition tuplesort.c:174
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition tuplesort.c:652
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition tuplesort.c:734
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition tuplesort.c:1877
static bool consider_abort_common(Tuplesortstate *state)
Definition tuplesort.c:1215
static void tuplesort_free(Tuplesortstate *state)
Definition tuplesort.c:793
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition tuplesort.c:2203
#define TupleSortUseBumpTupleCxt(opt)
Definition tuplesort.h:82
#define TUPLESORT_RANDOMACCESS
Definition tuplesort.h:70
#define TUPLESORT_ALLOWBOUNDED
Definition tuplesort.h:73
char buffer[SLAB_SLOT_SIZE]
Definition tuplesort.c:145
union SlabSlot * nextfree
Definition tuplesort.c:144

◆ IS_SLAB_SLOT

#define IS_SLAB_SLOT (   state,
  tuple 
)
Value:
((char *) (tuple) >= (state)->slabMemoryBegin && \
(char *) (tuple) < (state)->slabMemoryEnd)

Definition at line 372 of file tuplesort.c.

◆ LACKMEM

#define LACKMEM (   state)    ((state)->availMem < 0 && !(state)->slabAllocatorUsed)

Definition at line 397 of file tuplesort.c.

◆ LEADER

#define LEADER (   state)    ((state)->shared && (state)->worker == -1)

Definition at line 402 of file tuplesort.c.

◆ MAXORDER

#define MAXORDER   500 /* maximum merge order */

Definition at line 175 of file tuplesort.c.

◆ MERGE_BUFFER_SIZE

#define MERGE_BUFFER_SIZE   (BLCKSZ * 32)

Definition at line 177 of file tuplesort.c.

◆ MINORDER

#define MINORDER   6 /* minimum merge order */

Definition at line 174 of file tuplesort.c.

◆ QSORT_THRESHOLD

#define QSORT_THRESHOLD   40

Definition at line 523 of file tuplesort.c.

◆ READTUP

#define READTUP (   state,
  stup,
  tape,
  len 
)    ((*(state)->base.readtup) (state, stup, tape, len))

Definition at line 395 of file tuplesort.c.

◆ RELEASE_SLAB_SLOT

#define RELEASE_SLAB_SLOT (   state,
  tuple 
)
Value:
do { \
SlabSlot *buf = (SlabSlot *) tuple; \
{ \
buf->nextfree = (state)->slabFreeHead; \
(state)->slabFreeHead = buf; \
} while(0)
#define IS_SLAB_SLOT(state, tuple)
Definition tuplesort.c:372

Definition at line 380 of file tuplesort.c.

381 { \
382 SlabSlot *buf = (SlabSlot *) tuple; \
383 \
385 { \
386 buf->nextfree = (state)->slabFreeHead; \
387 (state)->slabFreeHead = buf; \
388 } else \
389 pfree(buf); \
390 } while(0)

◆ REMOVEABBREV

#define REMOVEABBREV (   state,
  stup,
  count 
)    ((*(state)->base.removeabbrev) (state, stup, count))

Definition at line 392 of file tuplesort.c.

◆ SERIAL

#define SERIAL (   state)    ((state)->shared == NULL)

Definition at line 400 of file tuplesort.c.

◆ SLAB_SLOT_SIZE

#define SLAB_SLOT_SIZE   1024

Definition at line 140 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [1/2]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 492 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [2/2]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 492 of file tuplesort.c.

◆ ST_COMPARE

#define ST_COMPARE (   a,
  b,
  ssup 
)
Value:
ApplySortComparator((a)->datum1, (a)->isnull1, \
(b)->datum1, (b)->isnull1, (ssup))
int b
Definition isn.c:74
int a
Definition isn.c:73
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)

Definition at line 500 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [1/2]

#define ST_COMPARE_ARG_TYPE   Tuplesortstate

Definition at line 491 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [2/2]

#define ST_COMPARE_ARG_TYPE   SortSupportData

Definition at line 491 of file tuplesort.c.

◆ ST_COMPARE_RUNTIME_POINTER

#define ST_COMPARE_RUNTIME_POINTER

Definition at line 490 of file tuplesort.c.

◆ ST_DECLARE

#define ST_DECLARE

Definition at line 494 of file tuplesort.c.

◆ ST_DEFINE [1/2]

#define ST_DEFINE

Definition at line 495 of file tuplesort.c.

◆ ST_DEFINE [2/2]

#define ST_DEFINE

Definition at line 495 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [1/2]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 489 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [2/2]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 489 of file tuplesort.c.

◆ ST_SCOPE [1/2]

#define ST_SCOPE   static

Definition at line 493 of file tuplesort.c.

◆ ST_SCOPE [2/2]

#define ST_SCOPE   static

Definition at line 493 of file tuplesort.c.

◆ ST_SORT [1/2]

#define ST_SORT   qsort_tuple

Definition at line 488 of file tuplesort.c.

◆ ST_SORT [2/2]

#define ST_SORT   qsort_ssup

Definition at line 488 of file tuplesort.c.

◆ TAPE_BUFFER_OVERHEAD

#define TAPE_BUFFER_OVERHEAD   BLCKSZ

Definition at line 176 of file tuplesort.c.

◆ USEMEM

#define USEMEM (   state,
  amt 
)    ((state)->availMem -= (amt))

Definition at line 398 of file tuplesort.c.

◆ WORKER

#define WORKER (   state)    ((state)->shared && (state)->worker != -1)

Definition at line 401 of file tuplesort.c.

◆ WRITETUP

#define WRITETUP (   state,
  tape,
  stup 
)    ((*(state)->base.writetup) (state, tape, stup))

Definition at line 394 of file tuplesort.c.

Typedef Documentation

◆ RadixSortInfo

◆ SlabSlot

Enumeration Type Documentation

◆ TupSortStatus

Enumerator
TSS_INITIAL 
TSS_BOUNDED 
TSS_BUILDRUNS 
TSS_SORTEDINMEM 
TSS_SORTEDONTAPE 
TSS_FINALMERGE 

Definition at line 152 of file tuplesort.c.

153{
154 TSS_INITIAL, /* Loading tuples; still within memory limit */
155 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
156 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
157 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
158 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
159 TSS_FINALMERGE, /* Performing final merge on-the-fly */

Function Documentation

◆ beginmerge()

static void beginmerge ( Tuplesortstate state)
static

Definition at line 2156 of file tuplesort.c.

2157{
2158 int activeTapes;
2159 int srcTapeIndex;
2160
2161 /* Heap should be empty here */
2162 Assert(state->memtupcount == 0);
2163
2164 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2165
2167 {
2168 SortTuple tup;
2169
2170 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2171 {
2174 }
2175 }
2176}

References Assert, fb(), mergereadnext(), Min, SortTuple::srctape, and tuplesort_heap_insert().

Referenced by mergeonerun(), and mergeruns().

◆ consider_abort_common()

static bool consider_abort_common ( Tuplesortstate state)
static

Definition at line 1215 of file tuplesort.c.

1216{
1217 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1218 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1219 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1220
1221 /*
1222 * Check effectiveness of abbreviation optimization. Consider aborting
1223 * when still within memory limit.
1224 */
1225 if (state->status == TSS_INITIAL &&
1226 state->memtupcount >= state->abbrevNext)
1227 {
1228 state->abbrevNext *= 2;
1229
1230 /*
1231 * Check opclass-supplied abbreviation abort routine. It may indicate
1232 * that abbreviation should not proceed.
1233 */
1234 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1235 state->base.sortKeys))
1236 return false;
1237
1238 /*
1239 * Finally, restore authoritative comparator, and indicate that
1240 * abbreviation is not in play by setting abbrev_converter to NULL
1241 */
1242 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1243 state->base.sortKeys[0].abbrev_converter = NULL;
1244 /* Not strictly necessary, but be tidy */
1245 state->base.sortKeys[0].abbrev_abort = NULL;
1246 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1247
1248 /* Give up - expect original pass-by-value representation */
1249 return true;
1250 }
1251
1252 return false;
1253}

References Assert, fb(), and TSS_INITIAL.

Referenced by tuplesort_puttuple_common().

◆ current_byte()

static uint8 current_byte ( Datum  key,
int  level 
)
inlinestatic

Definition at line 2573 of file tuplesort.c.

2574{
2575 int shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
2576
2577 return (key >> shift) & 0xFF;
2578}

References BITS_PER_BYTE.

Referenced by radix_sort_recursive().

◆ dumptuples()

static void dumptuples ( Tuplesortstate state,
bool  alltuples 
)
static

Definition at line 2203 of file tuplesort.c.

2204{
2205 int memtupwrite;
2206 int i;
2207
2208 /*
2209 * Nothing to do if we still fit in available memory and have array slots,
2210 * unless this is the final call during initial run generation.
2211 */
2212 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2213 !alltuples)
2214 return;
2215
2216 /*
2217 * Final call might require no sorting, in rare cases where we just so
2218 * happen to have previously LACKMEM()'d at the point where exactly all
2219 * remaining tuples are loaded into memory, just before input was
2220 * exhausted. In general, short final runs are quite possible, but avoid
2221 * creating a completely empty run. In a worker, though, we must produce
2222 * at least one tape, even if it's empty.
2223 */
2224 if (state->memtupcount == 0 && state->currentRun > 0)
2225 return;
2226
2227 Assert(state->status == TSS_BUILDRUNS);
2228
2229 /*
2230 * It seems unlikely that this limit will ever be exceeded, but take no
2231 * chances
2232 */
2233 if (state->currentRun == INT_MAX)
2234 ereport(ERROR,
2236 errmsg("cannot have more than %d runs for an external sort",
2237 INT_MAX)));
2238
2239 if (state->currentRun > 0)
2241
2242 state->currentRun++;
2243
2244 if (trace_sort)
2245 elog(LOG, "worker %d starting quicksort of run %d: %s",
2246 state->worker, state->currentRun,
2247 pg_rusage_show(&state->ru_start));
2248
2249 /*
2250 * Sort all tuples accumulated within the allowed amount of memory for
2251 * this run.
2252 */
2254
2255 if (trace_sort)
2256 elog(LOG, "worker %d finished quicksort of run %d: %s",
2257 state->worker, state->currentRun,
2258 pg_rusage_show(&state->ru_start));
2259
2260 memtupwrite = state->memtupcount;
2261 for (i = 0; i < memtupwrite; i++)
2262 {
2263 SortTuple *stup = &state->memtuples[i];
2264
2265 WRITETUP(state, state->destTape, stup);
2266 }
2267
2268 state->memtupcount = 0;
2269
2270 /*
2271 * Reset tuple memory. We've freed all of the tuples that we previously
2272 * allocated. It's important to avoid fragmentation when there is a stark
2273 * change in the sizes of incoming tuples. In bounded sorts,
2274 * fragmentation due to AllocSetFree's bucketing by size class might be
2275 * particularly bad if this step wasn't taken.
2276 */
2277 MemoryContextReset(state->base.tuplecontext);
2278
2279 /*
2280 * Now update the memory accounting to subtract the memory used by the
2281 * tuple.
2282 */
2283 FREEMEM(state, state->tupleMem);
2284 state->tupleMem = 0;
2285
2286 markrunend(state->destTape);
2287
2288 if (trace_sort)
2289 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2290 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2291 pg_rusage_show(&state->ru_start));
2292}

References Assert, elog, ereport, errcode(), errmsg(), ERROR, fb(), FREEMEM, i, LACKMEM, LOG, markrunend(), MemoryContextReset(), pg_rusage_show(), selectnewtape(), trace_sort, TSS_BUILDRUNS, tuplesort_sort_memtuples(), and WRITETUP.

Referenced by tuplesort_performsort(), and tuplesort_puttuple_common().

◆ free_sort_tuple()

static void free_sort_tuple ( Tuplesortstate state,
SortTuple stup 
)
static

Definition at line 3400 of file tuplesort.c.

3401{
3402 if (stup->tuple)
3403 {
3405 pfree(stup->tuple);
3406 stup->tuple = NULL;
3407 }
3408}

References fb(), FREEMEM, GetMemoryChunkSpace(), and pfree().

Referenced by make_bounded_heap(), and tuplesort_puttuple_common().

◆ getlen()

static unsigned int getlen ( LogicalTape tape,
bool  eofOK 
)
static

Definition at line 3128 of file tuplesort.c.

3129{
3130 unsigned int len;
3131
3133 &len, sizeof(len)) != sizeof(len))
3134 elog(ERROR, "unexpected end of tape");
3135 if (len == 0 && !eofOK)
3136 elog(ERROR, "unexpected end of data");
3137 return len;
3138}

References elog, ERROR, fb(), len, and LogicalTapeRead().

Referenced by mergereadnext(), and tuplesort_gettuple_common().

◆ grow_memtuples()

static bool grow_memtuples ( Tuplesortstate state)
static

Definition at line 948 of file tuplesort.c.

949{
950 int newmemtupsize;
951 int memtupsize = state->memtupsize;
952 int64 memNowUsed = state->allowedMem - state->availMem;
953
954 /* Forget it if we've already maxed out memtuples, per comment above */
955 if (!state->growmemtuples)
956 return false;
957
958 /* Select new value of memtupsize */
959 if (memNowUsed <= state->availMem)
960 {
961 /*
962 * We've used no more than half of allowedMem; double our usage,
963 * clamping at INT_MAX tuples.
964 */
965 if (memtupsize < INT_MAX / 2)
966 newmemtupsize = memtupsize * 2;
967 else
968 {
970 state->growmemtuples = false;
971 }
972 }
973 else
974 {
975 /*
976 * This will be the last increment of memtupsize. Abandon doubling
977 * strategy and instead increase as much as we safely can.
978 *
979 * To stay within allowedMem, we can't increase memtupsize by more
980 * than availMem / sizeof(SortTuple) elements. In practice, we want
981 * to increase it by considerably less, because we need to leave some
982 * space for the tuples to which the new array slots will refer. We
983 * assume the new tuples will be about the same size as the tuples
984 * we've already seen, and thus we can extrapolate from the space
985 * consumption so far to estimate an appropriate new size for the
986 * memtuples array. The optimal value might be higher or lower than
987 * this estimate, but it's hard to know that in advance. We again
988 * clamp at INT_MAX tuples.
989 *
990 * This calculation is safe against enlarging the array so much that
991 * LACKMEM becomes true, because the memory currently used includes
992 * the present array; thus, there would be enough allowedMem for the
993 * new array elements even if no other memory were currently used.
994 *
995 * We do the arithmetic in float8, because otherwise the product of
996 * memtupsize and allowedMem could overflow. Any inaccuracy in the
997 * result should be insignificant; but even if we computed a
998 * completely insane result, the checks below will prevent anything
999 * really bad from happening.
1000 */
1001 double grow_ratio;
1002
1003 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1004 if (memtupsize * grow_ratio < INT_MAX)
1005 newmemtupsize = (int) (memtupsize * grow_ratio);
1006 else
1008
1009 /* We won't make any further enlargement attempts */
1010 state->growmemtuples = false;
1011 }
1012
1013 /* Must enlarge array by at least one element, else report failure */
1014 if (newmemtupsize <= memtupsize)
1015 goto noalloc;
1016
1017 /*
1018 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1019 * to ensure our request won't be rejected. Note that we can easily
1020 * exhaust address space before facing this outcome. (This is presently
1021 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1022 * don't rely on that at this distance.)
1023 */
1024 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1025 {
1027 state->growmemtuples = false; /* can't grow any more */
1028 }
1029
1030 /*
1031 * We need to be sure that we do not cause LACKMEM to become true, else
1032 * the space management algorithm will go nuts. The code above should
1033 * never generate a dangerous request, but to be safe, check explicitly
1034 * that the array growth fits within availMem. (We could still cause
1035 * LACKMEM if the memory chunk overhead associated with the memtuples
1036 * array were to increase. That shouldn't happen because we chose the
1037 * initial array size large enough to ensure that palloc will be treating
1038 * both old and new arrays as separate chunks. But we'll check LACKMEM
1039 * explicitly below just in case.)
1040 */
1041 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1042 goto noalloc;
1043
1044 /* OK, do it */
1045 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1046 state->memtupsize = newmemtupsize;
1047 state->memtuples = (SortTuple *)
1048 repalloc_huge(state->memtuples,
1049 state->memtupsize * sizeof(SortTuple));
1050 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1051 if (LACKMEM(state))
1052 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1053 return true;
1054
1055noalloc:
1056 /* If for any reason we didn't realloc, shut off future attempts */
1057 state->growmemtuples = false;
1058 return false;
1059}

References elog, ERROR, fb(), FREEMEM, GetMemoryChunkSpace(), LACKMEM, MaxAllocHugeSize, repalloc_huge(), and USEMEM.

Referenced by tuplesort_puttuple_common().

◆ init_slab_allocator()

static void init_slab_allocator ( Tuplesortstate state,
int  numSlots 
)
static

Definition at line 1877 of file tuplesort.c.

1878{
1879 if (numSlots > 0)
1880 {
1881 char *p;
1882 int i;
1883
1884 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1885 state->slabMemoryEnd = state->slabMemoryBegin +
1887 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1889
1890 p = state->slabMemoryBegin;
1891 for (i = 0; i < numSlots - 1; i++)
1892 {
1893 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1894 p += SLAB_SLOT_SIZE;
1895 }
1896 ((SlabSlot *) p)->nextfree = NULL;
1897 }
1898 else
1899 {
1900 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1901 state->slabFreeHead = NULL;
1902 }
1903 state->slabAllocatorUsed = true;
1904}

References fb(), i, palloc(), SLAB_SLOT_SIZE, and USEMEM.

Referenced by mergeruns().

◆ inittapes()

static void inittapes ( Tuplesortstate state,
bool  mergeruns 
)
static

Definition at line 1761 of file tuplesort.c.

1762{
1763 Assert(!LEADER(state));
1764
1765 if (mergeruns)
1766 {
1767 /* Compute number of input tapes to use when merging */
1768 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1769 }
1770 else
1771 {
1772 /* Workers can sometimes produce single run, output without merge */
1774 state->maxTapes = MINORDER;
1775 }
1776
1777 if (trace_sort)
1778 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1779 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1780
1781 /* Create the tape set */
1782 inittapestate(state, state->maxTapes);
1783 state->tapeset =
1785 state->shared ? &state->shared->fileset : NULL,
1786 state->worker);
1787
1788 state->currentRun = 0;
1789
1790 /*
1791 * Initialize logical tape arrays.
1792 */
1793 state->inputTapes = NULL;
1794 state->nInputTapes = 0;
1795 state->nInputRuns = 0;
1796
1797 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1798 state->nOutputTapes = 0;
1799 state->nOutputRuns = 0;
1800
1801 state->status = TSS_BUILDRUNS;
1802
1804}

References Assert, elog, fb(), inittapestate(), LEADER, LOG, LogicalTapeSetCreate(), mergeruns(), MINORDER, palloc0(), pg_rusage_show(), selectnewtape(), trace_sort, TSS_BUILDRUNS, tuplesort_merge_order(), and WORKER.

Referenced by tuplesort_performsort(), and tuplesort_puttuple_common().

◆ inittapestate()

static void inittapestate ( Tuplesortstate state,
int  maxTapes 
)
static

Definition at line 1810 of file tuplesort.c.

1811{
1813
1814 /*
1815 * Decrease availMem to reflect the space needed for tape buffers; but
1816 * don't decrease it to the point that we have no room for tuples. (That
1817 * case is only likely to occur if sorting pass-by-value Datums; in all
1818 * other scenarios the memtuples[] array is unlikely to occupy more than
1819 * half of allowedMem. In the pass-by-value case it's not important to
1820 * account for tuple space, so we don't care if LACKMEM becomes
1821 * inaccurate.)
1822 */
1823 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1824
1825 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1827
1828 /*
1829 * Make sure that the temp file(s) underlying the tape set are created in
1830 * suitable temp tablespaces. For parallel sorts, this should have been
1831 * called already, but it doesn't matter if it is called a second time.
1832 */
1834}

References fb(), GetMemoryChunkSpace(), PrepareTempTablespaces(), TAPE_BUFFER_OVERHEAD, and USEMEM.

Referenced by inittapes(), and leader_takeover_tapes().

◆ leader_takeover_tapes()

static void leader_takeover_tapes ( Tuplesortstate state)
static

Definition at line 3341 of file tuplesort.c.

3342{
3343 Sharedsort *shared = state->shared;
3344 int nParticipants = state->nParticipants;
3345 int workersFinished;
3346 int j;
3347
3349 Assert(nParticipants >= 1);
3350
3351 SpinLockAcquire(&shared->mutex);
3352 workersFinished = shared->workersFinished;
3353 SpinLockRelease(&shared->mutex);
3354
3355 if (nParticipants != workersFinished)
3356 elog(ERROR, "cannot take over tapes before all workers finish");
3357
3358 /*
3359 * Create the tapeset from worker tapes, including a leader-owned tape at
3360 * the end. Parallel workers are far more expensive than logical tapes,
3361 * so the number of tapes allocated here should never be excessive.
3362 */
3363 inittapestate(state, nParticipants);
3364 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3365
3366 /*
3367 * Set currentRun to reflect the number of runs we will merge (it's not
3368 * used for anything, this is just pro forma)
3369 */
3370 state->currentRun = nParticipants;
3371
3372 /*
3373 * Initialize the state to look the same as after building the initial
3374 * runs.
3375 *
3376 * There will always be exactly 1 run per worker, and exactly one input
3377 * tape per run, because workers always output exactly 1 run, even when
3378 * there were no input tuples for workers to sort.
3379 */
3380 state->inputTapes = NULL;
3381 state->nInputTapes = 0;
3382 state->nInputRuns = 0;
3383
3384 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3385 state->nOutputTapes = nParticipants;
3386 state->nOutputRuns = nParticipants;
3387
3388 for (j = 0; j < nParticipants; j++)
3389 {
3390 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3391 }
3392
3393 state->status = TSS_BUILDRUNS;
3394}

References Assert, elog, ERROR, fb(), Sharedsort::fileset, inittapestate(), j, LEADER, LogicalTapeImport(), LogicalTapeSetCreate(), Sharedsort::mutex, palloc0(), SpinLockAcquire, SpinLockRelease, Sharedsort::tapes, TSS_BUILDRUNS, and Sharedsort::workersFinished.

Referenced by tuplesort_performsort().

◆ make_bounded_heap()

static void make_bounded_heap ( Tuplesortstate state)
static

Definition at line 2483 of file tuplesort.c.

2484{
2485 int tupcount = state->memtupcount;
2486 int i;
2487
2488 Assert(state->status == TSS_INITIAL);
2489 Assert(state->bounded);
2490 Assert(tupcount >= state->bound);
2492
2493 /* Reverse sort direction so largest entry will be at root */
2495
2496 state->memtupcount = 0; /* make the heap empty */
2497 for (i = 0; i < tupcount; i++)
2498 {
2499 if (state->memtupcount < state->bound)
2500 {
2501 /* Insert next tuple into heap */
2502 /* Must copy source tuple to avoid possible overwrite */
2503 SortTuple stup = state->memtuples[i];
2504
2506 }
2507 else
2508 {
2509 /*
2510 * The heap is full. Replace the largest entry with the new
2511 * tuple, or just discard it, if it's larger than anything already
2512 * in the heap.
2513 */
2514 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2515 {
2516 free_sort_tuple(state, &state->memtuples[i]);
2518 }
2519 else
2520 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2521 }
2522 }
2523
2524 Assert(state->memtupcount == state->bound);
2525 state->status = TSS_BOUNDED;
2526}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, fb(), free_sort_tuple(), i, reversedirection(), SERIAL, TSS_BOUNDED, TSS_INITIAL, tuplesort_heap_insert(), and tuplesort_heap_replace_top().

Referenced by tuplesort_puttuple_common().

◆ markrunend()

static void markrunend ( LogicalTape tape)
static

Definition at line 3141 of file tuplesort.c.

3142{
3143 unsigned int len = 0;
3144
3145 LogicalTapeWrite(tape, &len, sizeof(len));
3146}

References fb(), len, and LogicalTapeWrite().

Referenced by dumptuples(), and mergeonerun().

◆ merge_read_buffer_size()

static int64 merge_read_buffer_size ( int64  avail_mem,
int  nInputTapes,
int  nInputRuns,
int  maxOutputTapes 
)
static

Definition at line 1729 of file tuplesort.c.

1731{
1732 int nOutputRuns;
1733 int nOutputTapes;
1734
1735 /*
1736 * How many output tapes will we produce in this pass?
1737 *
1738 * This is nInputRuns / nInputTapes, rounded up.
1739 */
1740 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1741
1742 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1743
1744 /*
1745 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1746 * remaining memory is divided evenly between the input tapes.
1747 *
1748 * This also follows from the formula in tuplesort_merge_order, but here
1749 * we derive the input buffer size from the amount of memory available,
1750 * and M and N.
1751 */
1752 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1753}

References fb(), Max, Min, and TAPE_BUFFER_OVERHEAD.

Referenced by mergeruns().

◆ mergeonerun()

static void mergeonerun ( Tuplesortstate state)
static

Definition at line 2096 of file tuplesort.c.

2097{
2098 int srcTapeIndex;
2100
2101 /*
2102 * Start the merge by loading one tuple from each active source tape into
2103 * the heap.
2104 */
2106
2107 Assert(state->slabAllocatorUsed);
2108
2109 /*
2110 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2111 * out, and replacing it with next tuple from same tape (if there is
2112 * another one).
2113 */
2114 while (state->memtupcount > 0)
2115 {
2117
2118 /* write the tuple to destTape */
2119 srcTapeIndex = state->memtuples[0].srctape;
2120 srcTape = state->inputTapes[srcTapeIndex];
2121 WRITETUP(state, state->destTape, &state->memtuples[0]);
2122
2123 /* recycle the slot of the tuple we just wrote out, for the next read */
2124 if (state->memtuples[0].tuple)
2125 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2126
2127 /*
2128 * pull next tuple from the tape, and replace the written-out tuple in
2129 * the heap with it.
2130 */
2132 {
2133 stup.srctape = srcTapeIndex;
2135 }
2136 else
2137 {
2139 state->nInputRuns--;
2140 }
2141 }
2142
2143 /*
2144 * When the heap empties, we're done. Write an end-of-run marker on the
2145 * output tape.
2146 */
2147 markrunend(state->destTape);
2148}

References Assert, beginmerge(), fb(), markrunend(), mergereadnext(), RELEASE_SLAB_SLOT, tuplesort_heap_delete_top(), tuplesort_heap_replace_top(), and WRITETUP.

Referenced by mergeruns().

◆ mergereadnext()

static bool mergereadnext ( Tuplesortstate state,
LogicalTape srcTape,
SortTuple stup 
)
static

Definition at line 2184 of file tuplesort.c.

2185{
2186 unsigned int tuplen;
2187
2188 /* read next tuple, if any */
2189 if ((tuplen = getlen(srcTape, true)) == 0)
2190 return false;
2191 READTUP(state, stup, srcTape, tuplen);
2192
2193 return true;
2194}

References fb(), getlen(), and READTUP.

Referenced by beginmerge(), mergeonerun(), and tuplesort_gettuple_common().

◆ mergeruns()

static void mergeruns ( Tuplesortstate state)
static

Definition at line 1913 of file tuplesort.c.

1914{
1915 int tapenum;
1916
1917 Assert(state->status == TSS_BUILDRUNS);
1918 Assert(state->memtupcount == 0);
1919
1920 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
1921 {
1922 /*
1923 * If there are multiple runs to be merged, when we go to read back
1924 * tuples from disk, abbreviated keys will not have been stored, and
1925 * we don't care to regenerate them. Disable abbreviation from this
1926 * point on.
1927 */
1928 state->base.sortKeys->abbrev_converter = NULL;
1929 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
1930
1931 /* Not strictly necessary, but be tidy */
1932 state->base.sortKeys->abbrev_abort = NULL;
1933 state->base.sortKeys->abbrev_full_comparator = NULL;
1934 }
1935
1936 /*
1937 * Reset tuple memory. We've freed all the tuples that we previously
1938 * allocated. We will use the slab allocator from now on.
1939 */
1940 MemoryContextResetOnly(state->base.tuplecontext);
1941
1942 /*
1943 * We no longer need a large memtuples array. (We will allocate a smaller
1944 * one for the heap later.)
1945 */
1946 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1947 pfree(state->memtuples);
1948 state->memtuples = NULL;
1949
1950 /*
1951 * Initialize the slab allocator. We need one slab slot per input tape,
1952 * for the tuples in the heap, plus one to hold the tuple last returned
1953 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
1954 * however, we don't need to do allocate anything.)
1955 *
1956 * In a multi-pass merge, we could shrink this allocation for the last
1957 * merge pass, if it has fewer tapes than previous passes, but we don't
1958 * bother.
1959 *
1960 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
1961 * to track memory usage of individual tuples.
1962 */
1963 if (state->base.tuples)
1964 init_slab_allocator(state, state->nOutputTapes + 1);
1965 else
1967
1968 /*
1969 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
1970 * from each input tape.
1971 *
1972 * We could shrink this, too, between passes in a multi-pass merge, but we
1973 * don't bother. (The initial input tapes are still in outputTapes. The
1974 * number of input tapes will not increase between passes.)
1975 */
1976 state->memtupsize = state->nOutputTapes;
1977 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
1978 state->nOutputTapes * sizeof(SortTuple));
1979 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1980
1981 /*
1982 * Use all the remaining memory we have available for tape buffers among
1983 * all the input tapes. At the beginning of each merge pass, we will
1984 * divide this memory between the input and output tapes in the pass.
1985 */
1986 state->tape_buffer_mem = state->availMem;
1987 USEMEM(state, state->tape_buffer_mem);
1988 if (trace_sort)
1989 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
1990 state->worker, state->tape_buffer_mem / 1024);
1991
1992 for (;;)
1993 {
1994 /*
1995 * On the first iteration, or if we have read all the runs from the
1996 * input tapes in a multi-pass merge, it's time to start a new pass.
1997 * Rewind all the output tapes, and make them inputs for the next
1998 * pass.
1999 */
2000 if (state->nInputRuns == 0)
2001 {
2003
2004 /* Close the old, emptied, input tapes */
2005 if (state->nInputTapes > 0)
2006 {
2007 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2008 LogicalTapeClose(state->inputTapes[tapenum]);
2009 pfree(state->inputTapes);
2010 }
2011
2012 /* Previous pass's outputs become next pass's inputs. */
2013 state->inputTapes = state->outputTapes;
2014 state->nInputTapes = state->nOutputTapes;
2015 state->nInputRuns = state->nOutputRuns;
2016
2017 /*
2018 * Reset output tape variables. The actual LogicalTapes will be
2019 * created as needed, here we only allocate the array to hold
2020 * them.
2021 */
2022 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2023 state->nOutputTapes = 0;
2024 state->nOutputRuns = 0;
2025
2026 /*
2027 * Redistribute the memory allocated for tape buffers, among the
2028 * new input and output tapes.
2029 */
2031 state->nInputTapes,
2032 state->nInputRuns,
2033 state->maxTapes);
2034
2035 if (trace_sort)
2036 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2037 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2038 pg_rusage_show(&state->ru_start));
2039
2040 /* Prepare the new input tapes for merge pass. */
2041 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2043
2044 /*
2045 * If there's just one run left on each input tape, then only one
2046 * merge pass remains. If we don't have to produce a materialized
2047 * sorted tape, we can stop at this point and do the final merge
2048 * on-the-fly.
2049 */
2050 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2051 && state->nInputRuns <= state->nInputTapes
2052 && !WORKER(state))
2053 {
2054 /* Tell logtape.c we won't be writing anymore */
2056 /* Initialize for the final merge pass */
2058 state->status = TSS_FINALMERGE;
2059 return;
2060 }
2061 }
2062
2063 /* Select an output tape */
2065
2066 /* Merge one run from each input tape. */
2068
2069 /*
2070 * If the input tapes are empty, and we output only one output run,
2071 * we're done. The current output tape contains the final result.
2072 */
2073 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2074 break;
2075 }
2076
2077 /*
2078 * Done. The result is on a single run on a single tape.
2079 */
2080 state->result_tape = state->outputTapes[0];
2081 if (!WORKER(state))
2082 LogicalTapeFreeze(state->result_tape, NULL);
2083 else
2085 state->status = TSS_SORTEDONTAPE;
2086
2087 /* Close all the now-empty input tapes, to release their read buffers. */
2088 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2089 LogicalTapeClose(state->inputTapes[tapenum]);
2090}

References Assert, beginmerge(), elog, fb(), FREEMEM, GetMemoryChunkSpace(), init_slab_allocator(), INT64_FORMAT, LOG, LogicalTapeClose(), LogicalTapeFreeze(), LogicalTapeRewindForRead(), LogicalTapeSetForgetFreeSpace(), MemoryContextAlloc(), MemoryContextResetOnly(), merge_read_buffer_size(), mergeonerun(), palloc0(), pfree(), pg_rusage_show(), selectnewtape(), trace_sort, TSS_BUILDRUNS, TSS_FINALMERGE, TSS_SORTEDONTAPE, TUPLESORT_RANDOMACCESS, USEMEM, WORKER, and worker_freeze_result_tape().

Referenced by inittapes(), and tuplesort_performsort().

◆ normalize_datum()

static Datum normalize_datum ( Datum  orig,
SortSupport  ssup 
)
inlinestatic

Definition at line 2585 of file tuplesort.c.

2586{
2588
2589 if (ssup->comparator == ssup_datum_signed_cmp)
2590 {
2591 norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
2592 }
2593 else if (ssup->comparator == ssup_datum_int32_cmp)
2594 {
2595 /*
2596 * First truncate to uint32. Technically, we don't need to do this,
2597 * but it forces the upper half of the datum to be zero regardless of
2598 * sign.
2599 */
2601
2603 }
2604 else
2605 {
2607 norm_datum1 = orig;
2608 }
2609
2610 if (ssup->ssup_reverse)
2612
2613 return norm_datum1;
2614}

References Assert, SortSupportData::comparator, DatumGetUInt32(), fb(), PG_INT32_MAX, PG_INT64_MAX, ssup_datum_int32_cmp(), ssup_datum_signed_cmp(), ssup_datum_unsigned_cmp(), SortSupportData::ssup_reverse, and UInt32GetDatum().

Referenced by radix_sort_recursive().

◆ radix_sort_recursive()

static void radix_sort_recursive ( SortTuple begin,
size_t  n_elems,
int  level,
Tuplesortstate state 
)
static

Definition at line 2654 of file tuplesort.c.

2655{
2656 RadixSortInfo partitions[256] = {0};
2658 size_t total = 0;
2659 int num_partitions = 0;
2660 int num_remaining;
2661 SortSupport ssup = &state->base.sortKeys[0];
2662 size_t start_offset = 0;
2663 SortTuple *partition_begin = begin;
2664
2665 /* count number of occurrences of each byte */
2666 for (SortTuple *st = begin; st < begin + n_elems; st++)
2667 {
2669
2670 /* extract the byte for this level from the normalized datum */
2671 this_partition = current_byte(normalize_datum(st->datum1, ssup),
2672 level);
2673
2674 /* save it for the permutation step */
2675 st->curbyte = this_partition;
2676
2677 partitions[this_partition].count++;
2678
2680 }
2681
2682 /* compute partition offsets */
2683 for (int i = 0; i < 256; i++)
2684 {
2685 size_t count = partitions[i].count;
2686
2687 if (count != 0)
2688 {
2689 partitions[i].offset = total;
2690 total += count;
2691 remaining_partitions[num_partitions] = i;
2692 num_partitions++;
2693 }
2694 partitions[i].next_offset = total;
2695 }
2696
2697 /*
2698 * Swap tuples to correct partition.
2699 *
2700 * In traditional American flag sort, a swap sends the current element to
2701 * the correct partition, but the array pointer only advances if the
2702 * partner of the swap happens to be an element that belongs in the
2703 * current partition. That only requires one pass through the array, but
2704 * the disadvantage is we don't know if the pointer can advance until the
2705 * swap completes. Here lies the most interesting innovation from the
2706 * upstream ska_byte_sort: After initiating the swap, we immediately
2707 * proceed to the next element. This makes better use of CPU pipelining,
2708 * but also means that we will often need multiple iterations of this
2709 * loop. ska_byte_sort() maintains a separate list of which partitions
2710 * haven't finished, which is updated every loop iteration. Here we simply
2711 * check each partition during every iteration.
2712 *
2713 * If we started with a single partition, there is nothing to do. If a
2714 * previous loop iteration results in only one partition that hasn't been
2715 * counted as sorted, we know it's actually sorted and can exit the loop.
2716 */
2717 num_remaining = num_partitions;
2718 while (num_remaining > 1)
2719 {
2720 /* start the count over */
2721 num_remaining = num_partitions;
2722
2723 for (int i = 0; i < num_partitions; i++)
2724 {
2726
2727 for (SortTuple *st = begin + partitions[idx].offset;
2728 st < begin + partitions[idx].next_offset;
2729 st++)
2730 {
2731 size_t offset = partitions[st->curbyte].offset++;
2732 SortTuple tmp;
2733
2734 /* swap current tuple with destination position */
2735 Assert(offset < n_elems);
2736 tmp = *st;
2737 *st = begin[offset];
2738 begin[offset] = tmp;
2739
2741 };
2742
2743 /* Is this partition sorted? */
2744 if (partitions[idx].offset == partitions[idx].next_offset)
2745 num_remaining--;
2746 }
2747 }
2748
2749 /* recurse */
2751 rp < remaining_partitions + num_partitions;
2752 rp++)
2753 {
2754 size_t end_offset = partitions[*rp].next_offset;
2757
2758 if (num_elements > 1)
2759 {
2760 if (level < sizeof(Datum) - 1)
2761 {
2763 {
2766 state->base.comparetup,
2767 state);
2768 }
2769 else
2770 {
2773 level + 1,
2774 state);
2775 }
2776 }
2777 else if (state->base.onlyKey == NULL)
2778 {
2779 /*
2780 * We've finished radix sort on all bytes of the pass-by-value
2781 * datum (possibly abbreviated), now sort using the tiebreak
2782 * comparator.
2783 */
2786 state->base.comparetup_tiebreak,
2787 state);
2788 }
2789 }
2790
2793 }
2794}

References Assert, CHECK_FOR_INTERRUPTS, current_byte(), fb(), i, idx(), normalize_datum(), partitions, QSORT_THRESHOLD, and radix_sort_recursive().

Referenced by radix_sort_recursive(), and radix_sort_tuple().

◆ radix_sort_tuple()

static void radix_sort_tuple ( SortTuple data,
size_t  n,
Tuplesortstate state 
)
static

Definition at line 2803 of file tuplesort.c.

2804{
2805 bool nulls_first = state->base.sortKeys[0].ssup_nulls_first;
2808 size_t d1 = 0,
2809 d2,
2810 null_count,
2812
2813 /*
2814 * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
2815 * This also serves as a quick check for the common case where all tuples
2816 * are NOT NULL in the first sort key.
2817 */
2818 while (d1 < n && data[d1].isnull1 == nulls_first)
2819 {
2820 d1++;
2822 }
2823
2824 /*
2825 * If we have more than one tuple left after the quick check, partition
2826 * the remainder using branchless cyclic permutation, based on
2827 * https://orlp.net/blog/branchless-lomuto-partitioning/
2828 */
2829 Assert(n > 0);
2830 if (d1 < n - 1)
2831 {
2832 size_t i = d1,
2833 j = d1;
2834 SortTuple tmp = data[d1]; /* create gap at front */
2835
2836 while (j < n - 1)
2837 {
2838 /* gap is at j, move i's element to gap */
2839 data[j] = data[i];
2840 /* advance j to the first unknown element */
2841 j += 1;
2842 /* move the first unknown element back to i */
2843 data[i] = data[j];
2844 /* advance i if this element belongs in the left partition */
2845 i += (data[i].isnull1 == nulls_first);
2846
2848 }
2849
2850 /* place gap between left and right partitions */
2851 data[j] = data[i];
2852 /* restore the saved element */
2853 data[i] = tmp;
2854 /* assign it to the correct partition */
2855 i += (data[i].isnull1 == nulls_first);
2856
2857 /* d1 is now the number of elements in the left partition */
2858 d1 = i;
2859 }
2860
2861 d2 = n - d1;
2862
2863 /* set pointers and counts for each partition */
2864 if (nulls_first)
2865 {
2866 null_start = data;
2867 null_count = d1;
2868 not_null_start = data + d1;
2869 not_null_count = d2;
2870 }
2871 else
2872 {
2874 not_null_count = d1;
2875 null_start = data + d1;
2876 null_count = d2;
2877 }
2878
2879 for (SortTuple *st = null_start;
2880 st < null_start + null_count;
2881 st++)
2882 Assert(st->isnull1 == true);
2883 for (SortTuple *st = not_null_start;
2885 st++)
2886 Assert(st->isnull1 == false);
2887
2888 /*
2889 * Sort the NULL partition using tiebreak comparator, if necessary.
2890 */
2891 if (state->base.onlyKey == NULL && null_count > 1)
2892 {
2894 null_count,
2895 state->base.comparetup_tiebreak,
2896 state);
2897 }
2898
2899 /*
2900 * Sort the NOT NULL partition, using radix sort if large enough,
2901 * otherwise fall back to quicksort.
2902 */
2904 {
2907 state->base.comparetup,
2908 state);
2909 }
2910 else
2911 {
2912 bool presorted = true;
2913
2914 for (SortTuple *st = not_null_start + 1;
2916 st++)
2917 {
2918 if (COMPARETUP(state, st - 1, st) > 0)
2919 {
2920 presorted = false;
2921 break;
2922 }
2923
2925 }
2926
2927 if (presorted)
2928 return;
2929 else
2930 {
2933 0,
2934 state);
2935 }
2936 }
2937}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, data, fb(), i, j, QSORT_THRESHOLD, and radix_sort_recursive().

Referenced by tuplesort_sort_memtuples().

◆ reversedirection()

static void reversedirection ( Tuplesortstate state)
static

Definition at line 3110 of file tuplesort.c.

3111{
3112 SortSupport sortKey = state->base.sortKeys;
3113 int nkey;
3114
3115 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
3116 {
3117 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3118 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3119 }
3120}

References fb().

Referenced by make_bounded_heap(), and sort_bounded_heap().

◆ selectnewtape()

static void selectnewtape ( Tuplesortstate state)
static

Definition at line 1844 of file tuplesort.c.

1845{
1846 /*
1847 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1848 * both zero. On each call, we create a new output tape to hold the next
1849 * run, until maxTapes is reached. After that, we assign new runs to the
1850 * existing tapes in a round robin fashion.
1851 */
1852 if (state->nOutputTapes < state->maxTapes)
1853 {
1854 /* Create a new tape to hold the next run */
1855 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1856 Assert(state->nOutputRuns == state->nOutputTapes);
1857 state->destTape = LogicalTapeCreate(state->tapeset);
1858 state->outputTapes[state->nOutputTapes] = state->destTape;
1859 state->nOutputTapes++;
1860 state->nOutputRuns++;
1861 }
1862 else
1863 {
1864 /*
1865 * We have reached the max number of tapes. Append to an existing
1866 * tape.
1867 */
1868 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1869 state->nOutputRuns++;
1870 }
1871}

References Assert, fb(), and LogicalTapeCreate().

Referenced by dumptuples(), inittapes(), and mergeruns().

◆ sort_bounded_heap()

static void sort_bounded_heap ( Tuplesortstate state)
static

Definition at line 2532 of file tuplesort.c.

2533{
2534 int tupcount = state->memtupcount;
2535
2536 Assert(state->status == TSS_BOUNDED);
2537 Assert(state->bounded);
2538 Assert(tupcount == state->bound);
2540
2541 /*
2542 * We can unheapify in place because each delete-top call will remove the
2543 * largest entry, which we can promptly store in the newly freed slot at
2544 * the end. Once we're down to a single-entry heap, we're done.
2545 */
2546 while (state->memtupcount > 1)
2547 {
2548 SortTuple stup = state->memtuples[0];
2549
2550 /* this sifts-up the next-largest entry and decreases memtupcount */
2552 state->memtuples[state->memtupcount] = stup;
2553 }
2554 state->memtupcount = tupcount;
2555
2556 /*
2557 * Reverse sort direction back to the original state. This is not
2558 * actually necessary but seems like a good idea for tidiness.
2559 */
2561
2562 state->status = TSS_SORTEDINMEM;
2563 state->boundUsed = true;
2564}

References Assert, fb(), reversedirection(), SERIAL, TSS_BOUNDED, TSS_SORTEDINMEM, and tuplesort_heap_delete_top().

Referenced by tuplesort_performsort().

◆ ssup_datum_int32_cmp()

int ssup_datum_int32_cmp ( Datum  x,
Datum  y,
SortSupport  ssup 
)

Definition at line 3436 of file tuplesort.c.

3437{
3440
3441 if (xx < yy)
3442 return -1;
3443 else if (xx > yy)
3444 return 1;
3445 else
3446 return 0;
3447}

References DatumGetInt32(), fb(), x, and y.

Referenced by btint4sortsupport(), date_sortsupport(), normalize_datum(), and tuplesort_sort_memtuples().

◆ ssup_datum_signed_cmp()

int ssup_datum_signed_cmp ( Datum  x,
Datum  y,
SortSupport  ssup 
)

Definition at line 3422 of file tuplesort.c.

3423{
3426
3427 if (xx < yy)
3428 return -1;
3429 else if (xx > yy)
3430 return 1;
3431 else
3432 return 0;
3433}

References DatumGetInt64(), fb(), x, and y.

Referenced by btint8sortsupport(), normalize_datum(), timestamp_sortsupport(), and tuplesort_sort_memtuples().

◆ ssup_datum_unsigned_cmp()

int ssup_datum_unsigned_cmp ( Datum  x,
Datum  y,
SortSupport  ssup 
)

Definition at line 3411 of file tuplesort.c.

3412{
3413 if (x < y)
3414 return -1;
3415 else if (x > y)
3416 return 1;
3417 else
3418 return 0;
3419}

References x, and y.

Referenced by bytea_sortsupport(), gist_point_sortsupport(), macaddr_sortsupport(), network_sortsupport(), normalize_datum(), tuplesort_sort_memtuples(), uuid_sortsupport(), and varstr_sortsupport().

◆ tuplesort_attach_shared()

void tuplesort_attach_shared ( Sharedsort shared,
dsm_segment seg 
)

Definition at line 3233 of file tuplesort.c.

3234{
3235 /* Attach to SharedFileSet */
3236 SharedFileSetAttach(&shared->fileset, seg);
3237}

References Sharedsort::fileset, and SharedFileSetAttach().

Referenced by _brin_parallel_build_main(), _bt_parallel_build_main(), and _gin_parallel_build_main().

◆ tuplesort_begin_batch()

static void tuplesort_begin_batch ( Tuplesortstate state)
static

Definition at line 652 of file tuplesort.c.

653{
654 MemoryContext oldcontext;
655
656 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
657
658 /*
659 * Caller tuple (e.g. IndexTuple) memory context.
660 *
661 * A dedicated child context used exclusively for caller passed tuples
662 * eases memory management. Resetting at key points reduces
663 * fragmentation. Note that the memtuples array of SortTuples is allocated
664 * in the parent context, not this context, because there is no need to
665 * free memtuples early. For bounded sorts, tuples may be pfreed in any
666 * order, so we use a regular aset.c context so that it can make use of
667 * free'd memory. When the sort is not bounded, we make use of a bump.c
668 * context as this keeps allocations more compact with less wastage.
669 * Allocations are also slightly more CPU efficient.
670 */
671 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
672 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
673 "Caller tuples",
675 else
676 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
677 "Caller tuples",
679
680
681 state->status = TSS_INITIAL;
682 state->bounded = false;
683 state->boundUsed = false;
684
685 state->availMem = state->allowedMem;
686
687 state->tapeset = NULL;
688
689 state->memtupcount = 0;
690
691 state->growmemtuples = true;
692 state->slabAllocatorUsed = false;
693 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
694 {
695 pfree(state->memtuples);
696 state->memtuples = NULL;
697 state->memtupsize = INITIAL_MEMTUPSIZE;
698 }
699 if (state->memtuples == NULL)
700 {
701 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
703 }
704
705 /* workMem must be large enough for the minimal memtuples array */
706 if (LACKMEM(state))
707 elog(ERROR, "insufficient memory allowed for sort");
708
709 state->currentRun = 0;
710
711 /*
712 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
713 * inittapes(), if needed.
714 */
715
716 state->result_tape = NULL; /* flag that result tape has not been formed */
717
718 MemoryContextSwitchTo(oldcontext);
719}

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BumpContextCreate(), elog, ERROR, fb(), GetMemoryChunkSpace(), INITIAL_MEMTUPSIZE, LACKMEM, MemoryContextSwitchTo(), palloc(), pfree(), TSS_INITIAL, TupleSortUseBumpTupleCxt, and USEMEM.

Referenced by tuplesort_begin_common(), and tuplesort_reset().

◆ tuplesort_begin_common()

Tuplesortstate * tuplesort_begin_common ( int  workMem,
SortCoordinate  coordinate,
int  sortopt 
)

Definition at line 546 of file tuplesort.c.

547{
549 MemoryContext maincontext;
550 MemoryContext sortcontext;
551 MemoryContext oldcontext;
552
553 /* See leader_takeover_tapes() remarks on random access support */
554 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
555 elog(ERROR, "random access disallowed under parallel sort");
556
557 /*
558 * Memory context surviving tuplesort_reset. This memory context holds
559 * data which is useful to keep while sorting multiple similar batches.
560 */
562 "TupleSort main",
564
565 /*
566 * Create a working memory context for one sort operation. The content of
567 * this context is deleted by tuplesort_reset.
568 */
569 sortcontext = AllocSetContextCreate(maincontext,
570 "TupleSort sort",
572
573 /*
574 * Additionally a working memory context for tuples is setup in
575 * tuplesort_begin_batch.
576 */
577
578 /*
579 * Make the Tuplesortstate within the per-sortstate context. This way, we
580 * don't need a separate pfree() operation for it at shutdown.
581 */
582 oldcontext = MemoryContextSwitchTo(maincontext);
583
585
586 if (trace_sort)
587 pg_rusage_init(&state->ru_start);
588
589 state->base.sortopt = sortopt;
590 state->base.tuples = true;
591 state->abbrevNext = 10;
592
593 /*
594 * workMem is forced to be at least 64KB, the current minimum valid value
595 * for the work_mem GUC. This is a defense against parallel sort callers
596 * that divide out memory among many workers in a way that leaves each
597 * with very little memory.
598 */
599 state->allowedMem = Max(workMem, 64) * (int64) 1024;
600 state->base.sortcontext = sortcontext;
601 state->base.maincontext = maincontext;
602
603 state->memtupsize = INITIAL_MEMTUPSIZE;
604 state->memtuples = NULL;
605
606 /*
607 * After all of the other non-parallel-related state, we setup all of the
608 * state needed for each batch.
609 */
611
612 /*
613 * Initialize parallel-related state based on coordination information
614 * from caller
615 */
616 if (!coordinate)
617 {
618 /* Serial sort */
619 state->shared = NULL;
620 state->worker = -1;
621 state->nParticipants = -1;
622 }
623 else if (coordinate->isWorker)
624 {
625 /* Parallel worker produces exactly one final run from all input */
626 state->shared = coordinate->sharedsort;
628 state->nParticipants = -1;
629 }
630 else
631 {
632 /* Parallel leader state only used for final merge */
633 state->shared = coordinate->sharedsort;
634 state->worker = -1;
635 state->nParticipants = coordinate->nParticipants;
636 Assert(state->nParticipants >= 1);
637 }
638
639 MemoryContextSwitchTo(oldcontext);
640
641 return state;
642}

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CurrentMemoryContext, elog, ERROR, fb(), INITIAL_MEMTUPSIZE, Max, MemoryContextSwitchTo(), palloc0_object, pg_rusage_init(), trace_sort, tuplesort_begin_batch(), TUPLESORT_RANDOMACCESS, and worker_get_identifier().

Referenced by tuplesort_begin_cluster(), tuplesort_begin_datum(), tuplesort_begin_heap(), tuplesort_begin_index_brin(), tuplesort_begin_index_btree(), tuplesort_begin_index_gin(), tuplesort_begin_index_gist(), and tuplesort_begin_index_hash().

◆ tuplesort_end()

◆ tuplesort_estimate_shared()

Size tuplesort_estimate_shared ( int  nWorkers)

Definition at line 3189 of file tuplesort.c.

3190{
3192
3193 Assert(nWorkers > 0);
3194
3195 /* Make sure that BufFile shared state is MAXALIGN'd */
3198
3199 return tapesSize;
3200}

References add_size(), Assert, fb(), MAXALIGN, and mul_size().

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().

◆ tuplesort_free()

static void tuplesort_free ( Tuplesortstate state)
static

Definition at line 793 of file tuplesort.c.

794{
795 /* context swap probably not needed, but let's be safe */
796 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
797 int64 spaceUsed;
798
799 if (state->tapeset)
800 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
801 else
802 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
803
804 /*
805 * Delete temporary "tape" files, if any.
806 *
807 * We don't bother to destroy the individual tapes here. They will go away
808 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
809 * finished tapes already.)
810 */
811 if (state->tapeset)
812 LogicalTapeSetClose(state->tapeset);
813
814 if (trace_sort)
815 {
816 if (state->tapeset)
817 elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
818 SERIAL(state) ? "external sort" : "parallel external sort",
819 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
820 else
821 elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
822 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
823 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
824 }
825
826 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
827
829 MemoryContextSwitchTo(oldcontext);
830
831 /*
832 * Free the per-sort memory context, thereby releasing all working memory.
833 */
834 MemoryContextReset(state->base.sortcontext);
835}

References elog, fb(), FREESTATE, LOG, LogicalTapeSetBlocks(), LogicalTapeSetClose(), MemoryContextReset(), MemoryContextSwitchTo(), pg_rusage_show(), SERIAL, and trace_sort.

Referenced by tuplesort_end(), and tuplesort_reset().

◆ tuplesort_get_stats()

void tuplesort_get_stats ( Tuplesortstate state,
TuplesortInstrumentation stats 
)

Definition at line 2395 of file tuplesort.c.

2397{
2398 /*
2399 * Note: it might seem we should provide both memory and disk usage for a
2400 * disk-based sort. However, the current code doesn't track memory space
2401 * accurately once we have begun to return tuples to the caller (since we
2402 * don't account for pfree's the caller is expected to do), so we cannot
2403 * rely on availMem in a disk sort. This does not seem worth the overhead
2404 * to fix. Is it worth creating an API for the memory context code to
2405 * tell us how much is actually used in sortcontext?
2406 */
2408
2409 if (state->isMaxSpaceDisk)
2411 else
2413 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2414
2415 switch (state->maxSpaceStatus)
2416 {
2417 case TSS_SORTEDINMEM:
2418 if (state->boundUsed)
2420 else
2422 break;
2423 case TSS_SORTEDONTAPE:
2425 break;
2426 case TSS_FINALMERGE:
2428 break;
2429 default:
2431 break;
2432 }
2433}

References SORT_SPACE_TYPE_DISK, SORT_SPACE_TYPE_MEMORY, SORT_TYPE_EXTERNAL_MERGE, SORT_TYPE_EXTERNAL_SORT, SORT_TYPE_QUICKSORT, SORT_TYPE_STILL_IN_PROGRESS, SORT_TYPE_TOP_N_HEAPSORT, TuplesortInstrumentation::sortMethod, TuplesortInstrumentation::spaceType, TuplesortInstrumentation::spaceUsed, TSS_FINALMERGE, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and tuplesort_updatemax().

Referenced by ExecSort(), instrumentSortedGroup(), and show_sort_info().

◆ tuplesort_gettuple_common()

bool tuplesort_gettuple_common ( Tuplesortstate state,
bool  forward,
SortTuple stup 
)

Definition at line 1366 of file tuplesort.c.

1368{
1369 unsigned int tuplen;
1370 size_t nmoved;
1371
1372 Assert(!WORKER(state));
1373
1374 switch (state->status)
1375 {
1376 case TSS_SORTEDINMEM:
1377 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1378 Assert(!state->slabAllocatorUsed);
1379 if (forward)
1380 {
1381 if (state->current < state->memtupcount)
1382 {
1383 *stup = state->memtuples[state->current++];
1384 return true;
1385 }
1386 state->eof_reached = true;
1387
1388 /*
1389 * Complain if caller tries to retrieve more tuples than
1390 * originally asked for in a bounded sort. This is because
1391 * returning EOF here might be the wrong thing.
1392 */
1393 if (state->bounded && state->current >= state->bound)
1394 elog(ERROR, "retrieved too many tuples in a bounded sort");
1395
1396 return false;
1397 }
1398 else
1399 {
1400 if (state->current <= 0)
1401 return false;
1402
1403 /*
1404 * if all tuples are fetched already then we return last
1405 * tuple, else - tuple before last returned.
1406 */
1407 if (state->eof_reached)
1408 state->eof_reached = false;
1409 else
1410 {
1411 state->current--; /* last returned tuple */
1412 if (state->current <= 0)
1413 return false;
1414 }
1415 *stup = state->memtuples[state->current - 1];
1416 return true;
1417 }
1418 break;
1419
1420 case TSS_SORTEDONTAPE:
1421 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1422 Assert(state->slabAllocatorUsed);
1423
1424 /*
1425 * The slot that held the tuple that we returned in previous
1426 * gettuple call can now be reused.
1427 */
1428 if (state->lastReturnedTuple)
1429 {
1430 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1431 state->lastReturnedTuple = NULL;
1432 }
1433
1434 if (forward)
1435 {
1436 if (state->eof_reached)
1437 return false;
1438
1439 if ((tuplen = getlen(state->result_tape, true)) != 0)
1440 {
1441 READTUP(state, stup, state->result_tape, tuplen);
1442
1443 /*
1444 * Remember the tuple we return, so that we can recycle
1445 * its memory on next call. (This can be NULL, in the
1446 * !state->tuples case).
1447 */
1448 state->lastReturnedTuple = stup->tuple;
1449
1450 return true;
1451 }
1452 else
1453 {
1454 state->eof_reached = true;
1455 return false;
1456 }
1457 }
1458
1459 /*
1460 * Backward.
1461 *
1462 * if all tuples are fetched already then we return last tuple,
1463 * else - tuple before last returned.
1464 */
1465 if (state->eof_reached)
1466 {
1467 /*
1468 * Seek position is pointing just past the zero tuplen at the
1469 * end of file; back up to fetch last tuple's ending length
1470 * word. If seek fails we must have a completely empty file.
1471 */
1472 nmoved = LogicalTapeBackspace(state->result_tape,
1473 2 * sizeof(unsigned int));
1474 if (nmoved == 0)
1475 return false;
1476 else if (nmoved != 2 * sizeof(unsigned int))
1477 elog(ERROR, "unexpected tape position");
1478 state->eof_reached = false;
1479 }
1480 else
1481 {
1482 /*
1483 * Back up and fetch previously-returned tuple's ending length
1484 * word. If seek fails, assume we are at start of file.
1485 */
1486 nmoved = LogicalTapeBackspace(state->result_tape,
1487 sizeof(unsigned int));
1488 if (nmoved == 0)
1489 return false;
1490 else if (nmoved != sizeof(unsigned int))
1491 elog(ERROR, "unexpected tape position");
1492 tuplen = getlen(state->result_tape, false);
1493
1494 /*
1495 * Back up to get ending length word of tuple before it.
1496 */
1497 nmoved = LogicalTapeBackspace(state->result_tape,
1498 tuplen + 2 * sizeof(unsigned int));
1499 if (nmoved == tuplen + sizeof(unsigned int))
1500 {
1501 /*
1502 * We backed up over the previous tuple, but there was no
1503 * ending length word before it. That means that the prev
1504 * tuple is the first tuple in the file. It is now the
1505 * next to read in forward direction (not obviously right,
1506 * but that is what in-memory case does).
1507 */
1508 return false;
1509 }
1510 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1511 elog(ERROR, "bogus tuple length in backward scan");
1512 }
1513
1514 tuplen = getlen(state->result_tape, false);
1515
1516 /*
1517 * Now we have the length of the prior tuple, back up and read it.
1518 * Note: READTUP expects we are positioned after the initial
1519 * length word of the tuple, so back up to that point.
1520 */
1521 nmoved = LogicalTapeBackspace(state->result_tape,
1522 tuplen);
1523 if (nmoved != tuplen)
1524 elog(ERROR, "bogus tuple length in backward scan");
1525 READTUP(state, stup, state->result_tape, tuplen);
1526
1527 /*
1528 * Remember the tuple we return, so that we can recycle its memory
1529 * on next call. (This can be NULL, in the Datum case).
1530 */
1531 state->lastReturnedTuple = stup->tuple;
1532
1533 return true;
1534
1535 case TSS_FINALMERGE:
1536 Assert(forward);
1537 /* We are managing memory ourselves, with the slab allocator. */
1538 Assert(state->slabAllocatorUsed);
1539
1540 /*
1541 * The slab slot holding the tuple that we returned in previous
1542 * gettuple call can now be reused.
1543 */
1544 if (state->lastReturnedTuple)
1545 {
1546 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1547 state->lastReturnedTuple = NULL;
1548 }
1549
1550 /*
1551 * This code should match the inner loop of mergeonerun().
1552 */
1553 if (state->memtupcount > 0)
1554 {
1555 int srcTapeIndex = state->memtuples[0].srctape;
1556 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1558
1559 *stup = state->memtuples[0];
1560
1561 /*
1562 * Remember the tuple we return, so that we can recycle its
1563 * memory on next call. (This can be NULL, in the Datum case).
1564 */
1565 state->lastReturnedTuple = stup->tuple;
1566
1567 /*
1568 * Pull next tuple from tape, and replace the returned tuple
1569 * at top of the heap with it.
1570 */
1572 {
1573 /*
1574 * If no more data, we've reached end of run on this tape.
1575 * Remove the top node from the heap.
1576 */
1578 state->nInputRuns--;
1579
1580 /*
1581 * Close the tape. It'd go away at the end of the sort
1582 * anyway, but better to release the memory early.
1583 */
1585 return true;
1586 }
1587 newtup.srctape = srcTapeIndex;
1589 return true;
1590 }
1591 return false;
1592
1593 default:
1594 elog(ERROR, "invalid tuplesort state");
1595 return false; /* keep compiler quiet */
1596 }
1597}

References Assert, elog, ERROR, fb(), getlen(), LogicalTapeBackspace(), LogicalTapeClose(), mergereadnext(), READTUP, RELEASE_SLAB_SLOT, TSS_FINALMERGE, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, tuplesort_heap_delete_top(), tuplesort_heap_replace_top(), TUPLESORT_RANDOMACCESS, and WORKER.

Referenced by tuplesort_getbrintuple(), tuplesort_getdatum(), tuplesort_getgintuple(), tuplesort_getheaptuple(), tuplesort_getindextuple(), tuplesort_gettupleslot(), and tuplesort_skiptuples().

◆ tuplesort_heap_delete_top()

static void tuplesort_heap_delete_top ( Tuplesortstate state)
static

Definition at line 3046 of file tuplesort.c.

3047{
3048 SortTuple *memtuples = state->memtuples;
3049 SortTuple *tuple;
3050
3051 if (--state->memtupcount <= 0)
3052 return;
3053
3054 /*
3055 * Remove the last tuple in the heap, and re-insert it, by replacing the
3056 * current top node with it.
3057 */
3058 tuple = &memtuples[state->memtupcount];
3060}

References tuplesort_heap_replace_top().

Referenced by mergeonerun(), sort_bounded_heap(), and tuplesort_gettuple_common().

◆ tuplesort_heap_insert()

static void tuplesort_heap_insert ( Tuplesortstate state,
SortTuple tuple 
)
static

Definition at line 3011 of file tuplesort.c.

3012{
3013 SortTuple *memtuples;
3014 int j;
3015
3016 memtuples = state->memtuples;
3017 Assert(state->memtupcount < state->memtupsize);
3018
3020
3021 /*
3022 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3023 * using 1-based array indexes, not 0-based.
3024 */
3025 j = state->memtupcount++;
3026 while (j > 0)
3027 {
3028 int i = (j - 1) >> 1;
3029
3030 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3031 break;
3032 memtuples[j] = memtuples[i];
3033 j = i;
3034 }
3035 memtuples[j] = *tuple;
3036}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, i, and j.

Referenced by beginmerge(), and make_bounded_heap().

◆ tuplesort_heap_replace_top()

static void tuplesort_heap_replace_top ( Tuplesortstate state,
SortTuple tuple 
)
static

Definition at line 3070 of file tuplesort.c.

3071{
3072 SortTuple *memtuples = state->memtuples;
3073 unsigned int i,
3074 n;
3075
3076 Assert(state->memtupcount >= 1);
3077
3079
3080 /*
3081 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3082 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3083 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3084 */
3085 n = state->memtupcount;
3086 i = 0; /* i is where the "hole" is */
3087 for (;;)
3088 {
3089 unsigned int j = 2 * i + 1;
3090
3091 if (j >= n)
3092 break;
3093 if (j + 1 < n &&
3094 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3095 j++;
3096 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3097 break;
3098 memtuples[i] = memtuples[j];
3099 i = j;
3100 }
3101 memtuples[i] = *tuple;
3102}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, i, and j.

Referenced by make_bounded_heap(), mergeonerun(), tuplesort_gettuple_common(), tuplesort_heap_delete_top(), and tuplesort_puttuple_common().

◆ tuplesort_initialize_shared()

void tuplesort_initialize_shared ( Sharedsort shared,
int  nWorkers,
dsm_segment seg 
)

Definition at line 3210 of file tuplesort.c.

3211{
3212 int i;
3213
3214 Assert(nWorkers > 0);
3215
3216 SpinLockInit(&shared->mutex);
3217 shared->currentWorker = 0;
3218 shared->workersFinished = 0;
3219 SharedFileSetInit(&shared->fileset, seg);
3220 shared->nTapes = nWorkers;
3221 for (i = 0; i < nWorkers; i++)
3222 {
3223 shared->tapes[i].firstblocknumber = 0L;
3224 }
3225}

References Assert, Sharedsort::currentWorker, fb(), Sharedsort::fileset, TapeShare::firstblocknumber, i, Sharedsort::mutex, Sharedsort::nTapes, SharedFileSetInit(), SpinLockInit, Sharedsort::tapes, and Sharedsort::workersFinished.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().

◆ tuplesort_markpos()

void tuplesort_markpos ( Tuplesortstate state)

Definition at line 2331 of file tuplesort.c.

2332{
2333 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2334
2335 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2336
2337 switch (state->status)
2338 {
2339 case TSS_SORTEDINMEM:
2340 state->markpos_offset = state->current;
2341 state->markpos_eof = state->eof_reached;
2342 break;
2343 case TSS_SORTEDONTAPE:
2344 LogicalTapeTell(state->result_tape,
2345 &state->markpos_block,
2346 &state->markpos_offset);
2347 state->markpos_eof = state->eof_reached;
2348 break;
2349 default:
2350 elog(ERROR, "invalid tuplesort state");
2351 break;
2352 }
2353
2354 MemoryContextSwitchTo(oldcontext);
2355}

References Assert, elog, ERROR, LogicalTapeTell(), MemoryContextSwitchTo(), TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and TUPLESORT_RANDOMACCESS.

Referenced by ExecSortMarkPos().

◆ tuplesort_merge_order()

int tuplesort_merge_order ( int64  allowedMem)

Definition at line 1674 of file tuplesort.c.

1675{
1676 int mOrder;
1677
1678 /*----------
1679 * In the merge phase, we need buffer space for each input and output tape.
1680 * Each pass in the balanced merge algorithm reads from M input tapes, and
1681 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1682 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1683 * input tape.
1684 *
1685 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1686 * N * TAPE_BUFFER_OVERHEAD
1687 *
1688 * Except for the last and next-to-last merge passes, where there can be
1689 * fewer tapes left to process, M = N. We choose M so that we have the
1690 * desired amount of memory available for the input buffers
1691 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1692 * available for the tape buffers (allowedMem).
1693 *
1694 * Note: you might be thinking we need to account for the memtuples[]
1695 * array in this calculation, but we effectively treat that as part of the
1696 * MERGE_BUFFER_SIZE workspace.
1697 *----------
1698 */
1699 mOrder = allowedMem /
1701
1702 /*
1703 * Even in minimum memory, use at least a MINORDER merge. On the other
1704 * hand, even when we have lots of memory, do not use more than a MAXORDER
1705 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1706 * additional tape reduces the amount of memory available to build runs,
1707 * which in turn can cause the same sort to need more runs, which makes
1708 * merging slower even if it can still be done in a single pass. Also,
1709 * high order merges are quite slow due to CPU cache effects; it can be
1710 * faster to pay the I/O cost of a multi-pass merge than to perform a
1711 * single merge pass across many hundreds of tapes.
1712 */
1715
1716 return mOrder;
1717}

References fb(), Max, MAXORDER, MERGE_BUFFER_SIZE, Min, MINORDER, and TAPE_BUFFER_OVERHEAD.

Referenced by cost_tuplesort(), and inittapes().

◆ tuplesort_method_name()

const char * tuplesort_method_name ( TuplesortMethod  m)

Definition at line 2439 of file tuplesort.c.

2440{
2441 switch (m)
2442 {
2444 return "still in progress";
2446 return "top-N heapsort";
2448 return "quicksort";
2450 return "external sort";
2452 return "external merge";
2453 }
2454
2455 return "unknown";
2456}

References SORT_TYPE_EXTERNAL_MERGE, SORT_TYPE_EXTERNAL_SORT, SORT_TYPE_QUICKSORT, SORT_TYPE_STILL_IN_PROGRESS, and SORT_TYPE_TOP_N_HEAPSORT.

Referenced by show_incremental_sort_group_info(), and show_sort_info().

◆ tuplesort_performsort()

void tuplesort_performsort ( Tuplesortstate state)

Definition at line 1259 of file tuplesort.c.

1260{
1261 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1262
1263 if (trace_sort)
1264 elog(LOG, "performsort of worker %d starting: %s",
1265 state->worker, pg_rusage_show(&state->ru_start));
1266
1267 switch (state->status)
1268 {
1269 case TSS_INITIAL:
1270
1271 /*
1272 * We were able to accumulate all the tuples within the allowed
1273 * amount of memory, or leader to take over worker tapes
1274 */
1275 if (SERIAL(state))
1276 {
1277 /* Sort in memory and we're done */
1279 state->status = TSS_SORTEDINMEM;
1280 }
1281 else if (WORKER(state))
1282 {
1283 /*
1284 * Parallel workers must still dump out tuples to tape. No
1285 * merge is required to produce single output run, though.
1286 */
1287 inittapes(state, false);
1288 dumptuples(state, true);
1290 state->status = TSS_SORTEDONTAPE;
1291 }
1292 else
1293 {
1294 /*
1295 * Leader will take over worker tapes and merge worker runs.
1296 * Note that mergeruns sets the correct state->status.
1297 */
1300 }
1301 state->current = 0;
1302 state->eof_reached = false;
1303 state->markpos_block = 0L;
1304 state->markpos_offset = 0;
1305 state->markpos_eof = false;
1306 break;
1307
1308 case TSS_BOUNDED:
1309
1310 /*
1311 * We were able to accumulate all the tuples required for output
1312 * in memory, using a heap to eliminate excess tuples. Now we
1313 * have to transform the heap to a properly-sorted array. Note
1314 * that sort_bounded_heap sets the correct state->status.
1315 */
1317 state->current = 0;
1318 state->eof_reached = false;
1319 state->markpos_offset = 0;
1320 state->markpos_eof = false;
1321 break;
1322
1323 case TSS_BUILDRUNS:
1324
1325 /*
1326 * Finish tape-based sort. First, flush all tuples remaining in
1327 * memory out to tape; then merge until we have a single remaining
1328 * run (or, if !randomAccess and !WORKER(), one run per tape).
1329 * Note that mergeruns sets the correct state->status.
1330 */
1331 dumptuples(state, true);
1333 state->eof_reached = false;
1334 state->markpos_block = 0L;
1335 state->markpos_offset = 0;
1336 state->markpos_eof = false;
1337 break;
1338
1339 default:
1340 elog(ERROR, "invalid tuplesort state");
1341 break;
1342 }
1343
1344 if (trace_sort)
1345 {
1346 if (state->status == TSS_FINALMERGE)
1347 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1348 state->worker, state->nInputTapes,
1349 pg_rusage_show(&state->ru_start));
1350 else
1351 elog(LOG, "performsort of worker %d done: %s",
1352 state->worker, pg_rusage_show(&state->ru_start));
1353 }
1354
1355 MemoryContextSwitchTo(oldcontext);
1356}

References dumptuples(), elog, ERROR, fb(), inittapes(), leader_takeover_tapes(), LOG, MemoryContextSwitchTo(), mergeruns(), pg_rusage_show(), SERIAL, sort_bounded_heap(), trace_sort, TSS_BOUNDED, TSS_BUILDRUNS, TSS_FINALMERGE, TSS_INITIAL, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, tuplesort_sort_memtuples(), WORKER, and worker_nomergeruns().

Referenced by _brin_parallel_merge(), _brin_parallel_scan_and_build(), _bt_leafbuild(), _bt_parallel_scan_and_sort(), _gin_parallel_merge(), _gin_parallel_scan_and_build(), _gin_process_worker_data(), _h_indexbuild(), array_sort_internal(), ExecIncrementalSort(), ExecSort(), gistbuild(), heapam_relation_copy_for_cluster(), hypothetical_dense_rank_final(), hypothetical_rank_common(), initialize_phase(), mode_final(), percentile_cont_final_common(), percentile_cont_multi_final_common(), percentile_disc_final(), percentile_disc_multi_final(), process_ordered_aggregate_multi(), process_ordered_aggregate_single(), switchToPresortedPrefixMode(), and validate_index().

◆ tuplesort_puttuple_common()

void tuplesort_puttuple_common ( Tuplesortstate state,
SortTuple tuple,
bool  useAbbrev,
Size  tuplen 
)

Definition at line 1065 of file tuplesort.c.

1067{
1068 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1069
1070 Assert(!LEADER(state));
1071
1072 /* account for the memory used for this tuple */
1073 USEMEM(state, tuplen);
1074 state->tupleMem += tuplen;
1075
1076 if (!useAbbrev)
1077 {
1078 /*
1079 * Leave ordinary Datum representation, or NULL value. If there is a
1080 * converter it won't expect NULL values, and cost model is not
1081 * required to account for NULL, so in that case we avoid calling
1082 * converter and just set datum1 to zeroed representation (to be
1083 * consistent, and to support cheap inequality tests for NULL
1084 * abbreviated keys).
1085 */
1086 }
1087 else if (!consider_abort_common(state))
1088 {
1089 /* Store abbreviated key representation */
1090 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1091 state->base.sortKeys);
1092 }
1093 else
1094 {
1095 /*
1096 * Set state to be consistent with never trying abbreviation.
1097 *
1098 * Alter datum1 representation in already-copied tuples, so as to
1099 * ensure a consistent representation (current tuple was just
1100 * handled). It does not matter if some dumped tuples are already
1101 * sorted on tape, since serialized tuples lack abbreviated keys
1102 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1103 */
1104 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1105 }
1106
1107 switch (state->status)
1108 {
1109 case TSS_INITIAL:
1110
1111 /*
1112 * Save the tuple into the unsorted array. First, grow the array
1113 * as needed. Note that we try to grow the array when there is
1114 * still one free slot remaining --- if we fail, there'll still be
1115 * room to store the incoming tuple, and then we'll switch to
1116 * tape-based operation.
1117 */
1118 if (state->memtupcount >= state->memtupsize - 1)
1119 {
1121 Assert(state->memtupcount < state->memtupsize);
1122 }
1123 state->memtuples[state->memtupcount++] = *tuple;
1124
1125 /*
1126 * Check if it's time to switch over to a bounded heapsort. We do
1127 * so if the input tuple count exceeds twice the desired tuple
1128 * count (this is a heuristic for where heapsort becomes cheaper
1129 * than a quicksort), or if we've just filled workMem and have
1130 * enough tuples to meet the bound.
1131 *
1132 * Note that once we enter TSS_BOUNDED state we will always try to
1133 * complete the sort that way. In the worst case, if later input
1134 * tuples are larger than earlier ones, this might cause us to
1135 * exceed workMem significantly.
1136 */
1137 if (state->bounded &&
1138 (state->memtupcount > state->bound * 2 ||
1139 (state->memtupcount > state->bound && LACKMEM(state))))
1140 {
1141 if (trace_sort)
1142 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1143 state->memtupcount,
1144 pg_rusage_show(&state->ru_start));
1146 MemoryContextSwitchTo(oldcontext);
1147 return;
1148 }
1149
1150 /*
1151 * Done if we still fit in available memory and have array slots.
1152 */
1153 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1154 {
1155 MemoryContextSwitchTo(oldcontext);
1156 return;
1157 }
1158
1159 /*
1160 * Nope; time to switch to tape-based operation.
1161 */
1162 inittapes(state, true);
1163
1164 /*
1165 * Dump all tuples.
1166 */
1167 dumptuples(state, false);
1168 break;
1169
1170 case TSS_BOUNDED:
1171
1172 /*
1173 * We don't want to grow the array here, so check whether the new
1174 * tuple can be discarded before putting it in. This should be a
1175 * good speed optimization, too, since when there are many more
1176 * input tuples than the bound, most input tuples can be discarded
1177 * with just this one comparison. Note that because we currently
1178 * have the sort direction reversed, we must check for <= not >=.
1179 */
1180 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1181 {
1182 /* new tuple <= top of the heap, so we can discard it */
1183 free_sort_tuple(state, tuple);
1185 }
1186 else
1187 {
1188 /* discard top of heap, replacing it with the new tuple */
1189 free_sort_tuple(state, &state->memtuples[0]);
1191 }
1192 break;
1193
1194 case TSS_BUILDRUNS:
1195
1196 /*
1197 * Save the tuple into the unsorted array (there must be space)
1198 */
1199 state->memtuples[state->memtupcount++] = *tuple;
1200
1201 /*
1202 * If we are over the memory limit, dump all tuples.
1203 */
1204 dumptuples(state, false);
1205 break;
1206
1207 default:
1208 elog(ERROR, "invalid tuplesort state");
1209 break;
1210 }
1211 MemoryContextSwitchTo(oldcontext);
1212}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, consider_abort_common(), SortTuple::datum1, dumptuples(), elog, ERROR, fb(), free_sort_tuple(), grow_memtuples(), inittapes(), LACKMEM, LEADER, LOG, make_bounded_heap(), MemoryContextSwitchTo(), pg_rusage_show(), REMOVEABBREV, trace_sort, TSS_BOUNDED, TSS_BUILDRUNS, TSS_INITIAL, tuplesort_heap_replace_top(), and USEMEM.

Referenced by tuplesort_putbrintuple(), tuplesort_putdatum(), tuplesort_putgintuple(), tuplesort_putheaptuple(), tuplesort_putindextuplevalues(), and tuplesort_puttupleslot().

◆ tuplesort_readtup_alloc()

void * tuplesort_readtup_alloc ( Tuplesortstate state,
Size  tuplen 
)

Definition at line 3155 of file tuplesort.c.

3156{
3157 SlabSlot *buf;
3158
3159 /*
3160 * We pre-allocate enough slots in the slab arena that we should never run
3161 * out.
3162 */
3163 Assert(state->slabFreeHead);
3164
3165 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3166 return MemoryContextAlloc(state->base.sortcontext, tuplen);
3167 else
3168 {
3169 buf = state->slabFreeHead;
3170 /* Reuse this slot */
3171 state->slabFreeHead = buf->nextfree;
3172
3173 return buf;
3174 }
3175}

References Assert, buf, MemoryContextAlloc(), and SLAB_SLOT_SIZE.

Referenced by readtup_cluster(), readtup_datum(), readtup_heap(), readtup_index(), readtup_index_brin(), and readtup_index_gin().

◆ tuplesort_rescan()

void tuplesort_rescan ( Tuplesortstate state)

Definition at line 2298 of file tuplesort.c.

2299{
2300 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2301
2302 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2303
2304 switch (state->status)
2305 {
2306 case TSS_SORTEDINMEM:
2307 state->current = 0;
2308 state->eof_reached = false;
2309 state->markpos_offset = 0;
2310 state->markpos_eof = false;
2311 break;
2312 case TSS_SORTEDONTAPE:
2313 LogicalTapeRewindForRead(state->result_tape, 0);
2314 state->eof_reached = false;
2315 state->markpos_block = 0L;
2316 state->markpos_offset = 0;
2317 state->markpos_eof = false;
2318 break;
2319 default:
2320 elog(ERROR, "invalid tuplesort state");
2321 break;
2322 }
2323
2324 MemoryContextSwitchTo(oldcontext);
2325}

References Assert, elog, ERROR, fb(), LogicalTapeRewindForRead(), MemoryContextSwitchTo(), TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and TUPLESORT_RANDOMACCESS.

Referenced by ExecReScanSort(), mode_final(), percentile_cont_final_common(), percentile_cont_multi_final_common(), percentile_disc_final(), and percentile_disc_multi_final().

◆ tuplesort_reset()

void tuplesort_reset ( Tuplesortstate state)

Definition at line 915 of file tuplesort.c.

916{
919
920 /*
921 * After we've freed up per-batch memory, re-setup all of the state common
922 * to both the first batch and any subsequent batch.
923 */
925
926 state->lastReturnedTuple = NULL;
927 state->slabMemoryBegin = NULL;
928 state->slabMemoryEnd = NULL;
929 state->slabFreeHead = NULL;
930}

References fb(), tuplesort_begin_batch(), tuplesort_free(), and tuplesort_updatemax().

Referenced by ExecIncrementalSort(), ExecReScanIncrementalSort(), and switchToPresortedPrefixMode().

◆ tuplesort_restorepos()

void tuplesort_restorepos ( Tuplesortstate state)

Definition at line 2362 of file tuplesort.c.

2363{
2364 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2365
2366 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2367
2368 switch (state->status)
2369 {
2370 case TSS_SORTEDINMEM:
2371 state->current = state->markpos_offset;
2372 state->eof_reached = state->markpos_eof;
2373 break;
2374 case TSS_SORTEDONTAPE:
2375 LogicalTapeSeek(state->result_tape,
2376 state->markpos_block,
2377 state->markpos_offset);
2378 state->eof_reached = state->markpos_eof;
2379 break;
2380 default:
2381 elog(ERROR, "invalid tuplesort state");
2382 break;
2383 }
2384
2385 MemoryContextSwitchTo(oldcontext);
2386}

References Assert, elog, ERROR, LogicalTapeSeek(), MemoryContextSwitchTo(), TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and TUPLESORT_RANDOMACCESS.

Referenced by ExecSortRestrPos().

◆ tuplesort_set_bound()

void tuplesort_set_bound ( Tuplesortstate state,
int64  bound 
)

Definition at line 734 of file tuplesort.c.

735{
736 /* Assert we're called before loading any tuples */
737 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
738 /* Assert we allow bounded sorts */
739 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
740 /* Can't set the bound twice, either */
741 Assert(!state->bounded);
742 /* Also, this shouldn't be called in a parallel worker */
744
745 /* Parallel leader allows but ignores hint */
746 if (LEADER(state))
747 return;
748
749#ifdef DEBUG_BOUNDED_SORT
750 /* Honor GUC setting that disables the feature (for easy testing) */
752 return;
753#endif
754
755 /* We want to be able to compute bound * 2, so limit the setting */
756 if (bound > (int64) (INT_MAX / 2))
757 return;
758
759 state->bounded = true;
760 state->bound = (int) bound;
761
762 /*
763 * Bounded sorts are not an effective target for abbreviated key
764 * optimization. Disable by setting state to be consistent with no
765 * abbreviation support.
766 */
767 state->base.sortKeys->abbrev_converter = NULL;
768 if (state->base.sortKeys->abbrev_full_comparator)
769 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
770
771 /* Not strictly necessary, but be tidy */
772 state->base.sortKeys->abbrev_abort = NULL;
773 state->base.sortKeys->abbrev_full_comparator = NULL;
774}

References Assert, fb(), LEADER, TSS_INITIAL, TUPLESORT_ALLOWBOUNDED, and WORKER.

Referenced by ExecIncrementalSort(), ExecSort(), and switchToPresortedPrefixMode().

◆ tuplesort_skiptuples()

bool tuplesort_skiptuples ( Tuplesortstate state,
int64  ntuples,
bool  forward 
)

Definition at line 1606 of file tuplesort.c.

1607{
1608 MemoryContext oldcontext;
1609
1610 /*
1611 * We don't actually support backwards skip yet, because no callers need
1612 * it. The API is designed to allow for that later, though.
1613 */
1614 Assert(forward);
1615 Assert(ntuples >= 0);
1616 Assert(!WORKER(state));
1617
1618 switch (state->status)
1619 {
1620 case TSS_SORTEDINMEM:
1621 if (state->memtupcount - state->current >= ntuples)
1622 {
1623 state->current += ntuples;
1624 return true;
1625 }
1626 state->current = state->memtupcount;
1627 state->eof_reached = true;
1628
1629 /*
1630 * Complain if caller tries to retrieve more tuples than
1631 * originally asked for in a bounded sort. This is because
1632 * returning EOF here might be the wrong thing.
1633 */
1634 if (state->bounded && state->current >= state->bound)
1635 elog(ERROR, "retrieved too many tuples in a bounded sort");
1636
1637 return false;
1638
1639 case TSS_SORTEDONTAPE:
1640 case TSS_FINALMERGE:
1641
1642 /*
1643 * We could probably optimize these cases better, but for now it's
1644 * not worth the trouble.
1645 */
1646 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1647 while (ntuples-- > 0)
1648 {
1650
1652 {
1653 MemoryContextSwitchTo(oldcontext);
1654 return false;
1655 }
1657 }
1658 MemoryContextSwitchTo(oldcontext);
1659 return true;
1660
1661 default:
1662 elog(ERROR, "invalid tuplesort state");
1663 return false; /* keep compiler quiet */
1664 }
1665}

References Assert, CHECK_FOR_INTERRUPTS, elog, ERROR, fb(), MemoryContextSwitchTo(), TSS_FINALMERGE, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, tuplesort_gettuple_common(), and WORKER.

Referenced by percentile_cont_final_common(), percentile_cont_multi_final_common(), percentile_disc_final(), and percentile_disc_multi_final().

◆ tuplesort_sort_memtuples()

static void tuplesort_sort_memtuples ( Tuplesortstate state)
static

Definition at line 2958 of file tuplesort.c.

2959{
2960 Assert(!LEADER(state));
2961
2962 if (state->memtupcount > 1)
2963 {
2964 /*
2965 * Do we have the leading column's value or abbreviation in datum1?
2966 */
2967 if (state->base.haveDatum1 && state->base.sortKeys)
2968 {
2969 SortSupport ssup = &state->base.sortKeys[0];
2970
2971 /* Does it compare as an integer? */
2972 if (state->memtupcount >= QSORT_THRESHOLD &&
2976 {
2977 radix_sort_tuple(state->memtuples,
2978 state->memtupcount,
2979 state);
2981 return;
2982 }
2983 }
2984
2985 /* Can we use the single-key sort function? */
2986 if (state->base.onlyKey != NULL)
2987 {
2988 qsort_ssup(state->memtuples, state->memtupcount,
2989 state->base.onlyKey);
2990 }
2991 else
2992 {
2993 qsort_tuple(state->memtuples,
2994 state->memtupcount,
2995 state->base.comparetup,
2996 state);
2997 }
2998 }
2999}

References Assert, SortSupportData::comparator, fb(), LEADER, QSORT_THRESHOLD, radix_sort_tuple(), ssup_datum_int32_cmp(), ssup_datum_signed_cmp(), ssup_datum_unsigned_cmp(), and verify_memtuples_sorted().

Referenced by dumptuples(), and tuplesort_performsort().

◆ tuplesort_space_type_name()

const char * tuplesort_space_type_name ( TuplesortSpaceType  t)

Definition at line 2462 of file tuplesort.c.

2463{
2465 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2466}

References Assert, SORT_SPACE_TYPE_DISK, and SORT_SPACE_TYPE_MEMORY.

Referenced by show_incremental_sort_group_info(), and show_sort_info().

◆ tuplesort_updatemax()

static void tuplesort_updatemax ( Tuplesortstate state)
static

Definition at line 864 of file tuplesort.c.

865{
866 int64 spaceUsed;
867 bool isSpaceDisk;
868
869 /*
870 * Note: it might seem we should provide both memory and disk usage for a
871 * disk-based sort. However, the current code doesn't track memory space
872 * accurately once we have begun to return tuples to the caller (since we
873 * don't account for pfree's the caller is expected to do), so we cannot
874 * rely on availMem in a disk sort. This does not seem worth the overhead
875 * to fix. Is it worth creating an API for the memory context code to
876 * tell us how much is actually used in sortcontext?
877 */
878 if (state->tapeset)
879 {
880 isSpaceDisk = true;
881 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
882 }
883 else
884 {
885 isSpaceDisk = false;
886 spaceUsed = state->allowedMem - state->availMem;
887 }
888
889 /*
890 * Sort evicts data to the disk when it wasn't able to fit that data into
891 * main memory. This is why we assume space used on the disk to be more
892 * important for tracking resource usage than space used in memory. Note
893 * that the amount of space occupied by some tupleset on the disk might be
894 * less than amount of space occupied by the same tupleset in memory due
895 * to more compact representation.
896 */
897 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
898 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
899 {
900 state->maxSpace = spaceUsed;
901 state->isMaxSpaceDisk = isSpaceDisk;
902 state->maxSpaceStatus = state->status;
903 }
904}

References fb(), and LogicalTapeSetBlocks().

Referenced by tuplesort_get_stats(), and tuplesort_reset().

◆ tuplesort_used_bound()

bool tuplesort_used_bound ( Tuplesortstate state)

Definition at line 782 of file tuplesort.c.

783{
784 return state->boundUsed;
785}

Referenced by ExecIncrementalSort().

◆ verify_memtuples_sorted()

static void verify_memtuples_sorted ( Tuplesortstate state)
static

Definition at line 2941 of file tuplesort.c.

2942{
2943#ifdef USE_ASSERT_CHECKING
2944 for (SortTuple *st = state->memtuples + 1;
2945 st < state->memtuples + state->memtupcount;
2946 st++)
2947 Assert(COMPARETUP(state, st - 1, st) <= 0);
2948#endif
2949}

References Assert, and COMPARETUP.

Referenced by tuplesort_sort_memtuples().

◆ worker_freeze_result_tape()

static void worker_freeze_result_tape ( Tuplesortstate state)
static

Definition at line 3281 of file tuplesort.c.

3282{
3283 Sharedsort *shared = state->shared;
3285
3287 Assert(state->result_tape != NULL);
3288 Assert(state->memtupcount == 0);
3289
3290 /*
3291 * Free most remaining memory, in case caller is sensitive to our holding
3292 * on to it. memtuples may not be a tiny merge heap at this point.
3293 */
3294 pfree(state->memtuples);
3295 /* Be tidy */
3296 state->memtuples = NULL;
3297 state->memtupsize = 0;
3298
3299 /*
3300 * Parallel worker requires result tape metadata, which is to be stored in
3301 * shared memory for leader
3302 */
3303 LogicalTapeFreeze(state->result_tape, &output);
3304
3305 /* Store properties of output tape, and update finished worker count */
3306 SpinLockAcquire(&shared->mutex);
3307 shared->tapes[state->worker] = output;
3308 shared->workersFinished++;
3309 SpinLockRelease(&shared->mutex);
3310}

References Assert, fb(), LogicalTapeFreeze(), Sharedsort::mutex, output, pfree(), SpinLockAcquire, SpinLockRelease, Sharedsort::tapes, WORKER, and Sharedsort::workersFinished.

Referenced by mergeruns(), and worker_nomergeruns().

◆ worker_get_identifier()

static int worker_get_identifier ( Tuplesortstate state)
static

Definition at line 3253 of file tuplesort.c.

3254{
3255 Sharedsort *shared = state->shared;
3256 int worker;
3257
3259
3260 SpinLockAcquire(&shared->mutex);
3261 worker = shared->currentWorker++;
3262 SpinLockRelease(&shared->mutex);
3263
3264 return worker;
3265}

References Assert, Sharedsort::currentWorker, Sharedsort::mutex, SpinLockAcquire, SpinLockRelease, and WORKER.

Referenced by tuplesort_begin_common().

◆ worker_nomergeruns()

static void worker_nomergeruns ( Tuplesortstate state)
static

Definition at line 3319 of file tuplesort.c.

3320{
3322 Assert(state->result_tape == NULL);
3323 Assert(state->nOutputRuns == 1);
3324
3325 state->result_tape = state->destTape;
3327}

References Assert, fb(), WORKER, and worker_freeze_result_tape().

Referenced by tuplesort_performsort().

Variable Documentation

◆ trace_sort