PostgreSQL Source Code git master
Loading...
Searching...
No Matches
tuplesort.c File Reference
#include "postgres.h"
#include <limits.h>
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "storage/shmem.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
#include "utils/tuplesort.h"
#include "lib/sort_template.h"
Include dependency graph for tuplesort.c:

Go to the source code of this file.

Data Structures

union  SlabSlot
 
struct  Tuplesortstate
 
struct  Sharedsort
 

Macros

#define INITIAL_MEMTUPSIZE
 
#define SLAB_SLOT_SIZE   1024
 
#define MINORDER   6 /* minimum merge order */
 
#define MAXORDER   500 /* maximum merge order */
 
#define TAPE_BUFFER_OVERHEAD   BLCKSZ
 
#define MERGE_BUFFER_SIZE   (BLCKSZ * 32)
 
#define IS_SLAB_SLOT(state, tuple)
 
#define RELEASE_SLAB_SLOT(state, tuple)
 
#define REMOVEABBREV(state, stup, count)   ((*(state)->base.removeabbrev) (state, stup, count))
 
#define COMPARETUP(state, a, b)   ((*(state)->base.comparetup) (a, b, state))
 
#define WRITETUP(state, tape, stup)   ((*(state)->base.writetup) (state, tape, stup))
 
#define READTUP(state, stup, tape, len)   ((*(state)->base.readtup) (state, stup, tape, len))
 
#define FREESTATE(state)   ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
 
#define LACKMEM(state)   ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
 
#define USEMEM(state, amt)   ((state)->availMem -= (amt))
 
#define FREEMEM(state, amt)   ((state)->availMem += (amt))
 
#define SERIAL(state)   ((state)->shared == NULL)
 
#define WORKER(state)   ((state)->shared && (state)->worker != -1)
 
#define LEADER(state)   ((state)->shared && (state)->worker == -1)
 
#define ST_SORT   qsort_tuple_unsigned
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE(a, b, state)   qsort_tuple_unsigned_compare(a, b, state)
 
#define ST_COMPARE_ARG_TYPE   Tuplesortstate
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DEFINE
 
#define ST_SORT   qsort_tuple_signed
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE(a, b, state)   qsort_tuple_signed_compare(a, b, state)
 
#define ST_COMPARE_ARG_TYPE   Tuplesortstate
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DEFINE
 
#define ST_SORT   qsort_tuple_int32
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE(a, b, state)   qsort_tuple_int32_compare(a, b, state)
 
#define ST_COMPARE_ARG_TYPE   Tuplesortstate
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DEFINE
 
#define ST_SORT   qsort_tuple
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE_RUNTIME_POINTER
 
#define ST_COMPARE_ARG_TYPE   Tuplesortstate
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DECLARE
 
#define ST_DEFINE
 
#define ST_SORT   qsort_ssup
 
#define ST_ELEMENT_TYPE   SortTuple
 
#define ST_COMPARE(a, b, ssup)
 
#define ST_COMPARE_ARG_TYPE   SortSupportData
 
#define ST_CHECK_FOR_INTERRUPTS
 
#define ST_SCOPE   static
 
#define ST_DEFINE
 

Typedefs

typedef union SlabSlot SlabSlot
 

Enumerations

enum  TupSortStatus {
  TSS_INITIAL , TSS_BOUNDED , TSS_BUILDRUNS , TSS_SORTEDINMEM ,
  TSS_SORTEDONTAPE , TSS_FINALMERGE
}
 

Functions

static void tuplesort_begin_batch (Tuplesortstate *state)
 
static bool consider_abort_common (Tuplesortstate *state)
 
static void inittapes (Tuplesortstate *state, bool mergeruns)
 
static void inittapestate (Tuplesortstate *state, int maxTapes)
 
static void selectnewtape (Tuplesortstate *state)
 
static void init_slab_allocator (Tuplesortstate *state, int numSlots)
 
static void mergeruns (Tuplesortstate *state)
 
static void mergeonerun (Tuplesortstate *state)
 
static void beginmerge (Tuplesortstate *state)
 
static bool mergereadnext (Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
 
static void dumptuples (Tuplesortstate *state, bool alltuples)
 
static void make_bounded_heap (Tuplesortstate *state)
 
static void sort_bounded_heap (Tuplesortstate *state)
 
static void tuplesort_sort_memtuples (Tuplesortstate *state)
 
static void tuplesort_heap_insert (Tuplesortstate *state, SortTuple *tuple)
 
static void tuplesort_heap_replace_top (Tuplesortstate *state, SortTuple *tuple)
 
static void tuplesort_heap_delete_top (Tuplesortstate *state)
 
static void reversedirection (Tuplesortstate *state)
 
static unsigned int getlen (LogicalTape *tape, bool eofOK)
 
static void markrunend (LogicalTape *tape)
 
static int worker_get_identifier (Tuplesortstate *state)
 
static void worker_freeze_result_tape (Tuplesortstate *state)
 
static void worker_nomergeruns (Tuplesortstate *state)
 
static void leader_takeover_tapes (Tuplesortstate *state)
 
static void free_sort_tuple (Tuplesortstate *state, SortTuple *stup)
 
static void tuplesort_free (Tuplesortstate *state)
 
static void tuplesort_updatemax (Tuplesortstate *state)
 
static pg_attribute_always_inline int qsort_tuple_unsigned_compare (SortTuple *a, SortTuple *b, Tuplesortstate *state)
 
static pg_attribute_always_inline int qsort_tuple_signed_compare (SortTuple *a, SortTuple *b, Tuplesortstate *state)
 
static pg_attribute_always_inline int qsort_tuple_int32_compare (SortTuple *a, SortTuple *b, Tuplesortstate *state)
 
Tuplesortstatetuplesort_begin_common (int workMem, SortCoordinate coordinate, int sortopt)
 
void tuplesort_set_bound (Tuplesortstate *state, int64 bound)
 
bool tuplesort_used_bound (Tuplesortstate *state)
 
void tuplesort_end (Tuplesortstate *state)
 
void tuplesort_reset (Tuplesortstate *state)
 
static bool grow_memtuples (Tuplesortstate *state)
 
void tuplesort_puttuple_common (Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
 
void tuplesort_performsort (Tuplesortstate *state)
 
bool tuplesort_gettuple_common (Tuplesortstate *state, bool forward, SortTuple *stup)
 
bool tuplesort_skiptuples (Tuplesortstate *state, int64 ntuples, bool forward)
 
int tuplesort_merge_order (int64 allowedMem)
 
static int64 merge_read_buffer_size (int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
 
void tuplesort_rescan (Tuplesortstate *state)
 
void tuplesort_markpos (Tuplesortstate *state)
 
void tuplesort_restorepos (Tuplesortstate *state)
 
void tuplesort_get_stats (Tuplesortstate *state, TuplesortInstrumentation *stats)
 
const chartuplesort_method_name (TuplesortMethod m)
 
const chartuplesort_space_type_name (TuplesortSpaceType t)
 
voidtuplesort_readtup_alloc (Tuplesortstate *state, Size tuplen)
 
Size tuplesort_estimate_shared (int nWorkers)
 
void tuplesort_initialize_shared (Sharedsort *shared, int nWorkers, dsm_segment *seg)
 
void tuplesort_attach_shared (Sharedsort *shared, dsm_segment *seg)
 
int ssup_datum_unsigned_cmp (Datum x, Datum y, SortSupport ssup)
 
int ssup_datum_signed_cmp (Datum x, Datum y, SortSupport ssup)
 
int ssup_datum_int32_cmp (Datum x, Datum y, SortSupport ssup)
 

Variables

bool trace_sort = false
 

Macro Definition Documentation

◆ COMPARETUP

#define COMPARETUP (   state,
  a,
  b 
)    ((*(state)->base.comparetup) (a, b, state))

Definition at line 393 of file tuplesort.c.

◆ FREEMEM

#define FREEMEM (   state,
  amt 
)    ((state)->availMem += (amt))

Definition at line 399 of file tuplesort.c.

◆ FREESTATE

#define FREESTATE (   state)    ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)

Definition at line 396 of file tuplesort.c.

◆ INITIAL_MEMTUPSIZE

#define INITIAL_MEMTUPSIZE
Value:
Max(1024, \
#define Max(x, y)
Definition c.h:991
#define ALLOCSET_SEPARATE_THRESHOLD
Definition memutils.h:187

Definition at line 118 of file tuplesort.c.

142{
143 union SlabSlot *nextfree;
145} SlabSlot;
146
147/*
148 * Possible states of a Tuplesort object. These denote the states that
149 * persist between calls of Tuplesort routines.
150 */
151typedef enum
152{
153 TSS_INITIAL, /* Loading tuples; still within memory limit */
154 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
155 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
156 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
157 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
158 TSS_FINALMERGE, /* Performing final merge on-the-fly */
160
161/*
162 * Parameters for calculation of number of tapes to use --- see inittapes()
163 * and tuplesort_merge_order().
164 *
165 * In this calculation we assume that each tape will cost us about 1 blocks
166 * worth of buffer space. This ignores the overhead of all the other data
167 * structures needed for each tape, but it's probably close enough.
168 *
169 * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
170 * input tape, for pre-reading (see discussion at top of file). This is *in
171 * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
172 */
173#define MINORDER 6 /* minimum merge order */
174#define MAXORDER 500 /* maximum merge order */
175#define TAPE_BUFFER_OVERHEAD BLCKSZ
176#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
177
178
179/*
180 * Private state of a Tuplesort operation.
181 */
182struct Tuplesortstate
183{
185 TupSortStatus status; /* enumerated value as shown above */
186 bool bounded; /* did caller specify a maximum number of
187 * tuples to return? */
188 bool boundUsed; /* true if we made use of a bounded heap */
189 int bound; /* if bounded, the maximum number of tuples */
190 int64 tupleMem; /* memory consumed by individual tuples.
191 * storing this separately from what we track
192 * in availMem allows us to subtract the
193 * memory consumed by all tuples when dumping
194 * tuples to tape */
195 int64 availMem; /* remaining memory available, in bytes */
196 int64 allowedMem; /* total memory allowed, in bytes */
197 int maxTapes; /* max number of input tapes to merge in each
198 * pass */
199 int64 maxSpace; /* maximum amount of space occupied among sort
200 * of groups, either in-memory or on-disk */
201 bool isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
202 * false means in-memory */
203 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
204 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
205
206 /*
207 * This array holds the tuples now in sort memory. If we are in state
208 * INITIAL, the tuples are in no particular order; if we are in state
209 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
210 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
211 * H. In state SORTEDONTAPE, the array is not used.
212 */
213 SortTuple *memtuples; /* array of SortTuple structs */
214 int memtupcount; /* number of tuples currently present */
215 int memtupsize; /* allocated length of memtuples array */
216 bool growmemtuples; /* memtuples' growth still underway? */
217
218 /*
219 * Memory for tuples is sometimes allocated using a simple slab allocator,
220 * rather than with palloc(). Currently, we switch to slab allocation
221 * when we start merging. Merging only needs to keep a small, fixed
222 * number of tuples in memory at any time, so we can avoid the
223 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
224 * to hold the tuples.
225 *
226 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
227 * slots. The allocation is sized to have one slot per tape, plus one
228 * additional slot. We need that many slots to hold all the tuples kept
229 * in the heap during merge, plus the one we have last returned from the
230 * sort, with tuplesort_gettuple.
231 *
232 * Initially, all the slots are kept in a linked list of free slots. When
233 * a tuple is read from a tape, it is put to the next available slot, if
234 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
235 * instead.
236 *
237 * When we're done processing a tuple, we return the slot back to the free
238 * list, or pfree() if it was palloc'd. We know that a tuple was
239 * allocated from the slab, if its pointer value is between
240 * slabMemoryBegin and -End.
241 *
242 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
243 * tracking memory usage is not used.
244 */
246
247 char *slabMemoryBegin; /* beginning of slab memory arena */
248 char *slabMemoryEnd; /* end of slab memory arena */
249 SlabSlot *slabFreeHead; /* head of free list */
250
251 /* Memory used for input and output tape buffers. */
252 size_t tape_buffer_mem;
253
254 /*
255 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
256 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
257 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
258 * recycle the memory on next gettuple call.
259 */
260 void *lastReturnedTuple;
261
262 /*
263 * While building initial runs, this is the current output run number.
264 * Afterwards, it is the number of initial runs we made.
265 */
266 int currentRun;
267
268 /*
269 * Logical tapes, for merging.
270 *
271 * The initial runs are written in the output tapes. In each merge pass,
272 * the output tapes of the previous pass become the input tapes, and new
273 * output tapes are created as needed. When nInputTapes equals
274 * nInputRuns, there is only one merge pass left.
275 */
277 int nInputTapes;
278 int nInputRuns;
279
281 int nOutputTapes;
282 int nOutputRuns;
283
284 LogicalTape *destTape; /* current output tape */
285
286 /*
287 * These variables are used after completion of sorting to keep track of
288 * the next tuple to return. (In the tape case, the tape's current read
289 * position is also critical state.)
290 */
291 LogicalTape *result_tape; /* actual tape of finished output */
292 int current; /* array index (only used if SORTEDINMEM) */
293 bool eof_reached; /* reached EOF (needed for cursors) */
294
295 /* markpos_xxx holds marked position for mark and restore */
296 int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
297 int markpos_offset; /* saved "current", or offset in tape block */
298 bool markpos_eof; /* saved "eof_reached" */
299
300 /*
301 * These variables are used during parallel sorting.
302 *
303 * worker is our worker identifier. Follows the general convention that
304 * -1 value relates to a leader tuplesort, and values >= 0 worker
305 * tuplesorts. (-1 can also be a serial tuplesort.)
306 *
307 * shared is mutable shared memory state, which is used to coordinate
308 * parallel sorts.
309 *
310 * nParticipants is the number of worker Tuplesortstates known by the
311 * leader to have actually been launched, which implies that they must
312 * finish a run that the leader needs to merge. Typically includes a
313 * worker state held by the leader process itself. Set in the leader
314 * Tuplesortstate only.
315 */
316 int worker;
318 int nParticipants;
319
320 /*
321 * Additional state for managing "abbreviated key" sortsupport routines
322 * (which currently may be used by all cases except the hash index case).
323 * Tracks the intervals at which the optimization's effectiveness is
324 * tested.
325 */
326 int64 abbrevNext; /* Tuple # at which to next check
327 * applicability */
328
329 /*
330 * Resource snapshot for time of sort start.
331 */
333};
334
335/*
336 * Private mutable state of tuplesort-parallel-operation. This is allocated
337 * in shared memory.
338 */
339struct Sharedsort
340{
341 /* mutex protects all fields prior to tapes */
343
344 /*
345 * currentWorker generates ordinal identifier numbers for parallel sort
346 * workers. These start from 0, and are always gapless.
347 *
348 * Workers increment workersFinished to indicate having finished. If this
349 * is equal to state.nParticipants within the leader, leader is ready to
350 * merge worker runs.
351 */
352 int currentWorker;
353 int workersFinished;
354
355 /* Temporary file space */
357
358 /* Size of tapes flexible array */
359 int nTapes;
360
361 /*
362 * Tapes array used by workers to report back information needed by the
363 * leader to concatenate all worker tapes into one for merging
364 */
366};
367
368/*
369 * Is the given tuple allocated from the slab memory arena?
370 */
371#define IS_SLAB_SLOT(state, tuple) \
372 ((char *) (tuple) >= (state)->slabMemoryBegin && \
373 (char *) (tuple) < (state)->slabMemoryEnd)
374
375/*
376 * Return the given tuple to the slab memory free list, or free it
377 * if it was palloc'd.
378 */
379#define RELEASE_SLAB_SLOT(state, tuple) \
380 do { \
381 SlabSlot *buf = (SlabSlot *) tuple; \
382 \
383 if (IS_SLAB_SLOT((state), buf)) \
384 { \
385 buf->nextfree = (state)->slabFreeHead; \
386 (state)->slabFreeHead = buf; \
387 } else \
388 pfree(buf); \
389 } while(0)
390
391#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
392#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
393#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
394#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
395#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
396#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
397#define USEMEM(state,amt) ((state)->availMem -= (amt))
398#define FREEMEM(state,amt) ((state)->availMem += (amt))
399#define SERIAL(state) ((state)->shared == NULL)
400#define WORKER(state) ((state)->shared && (state)->worker != -1)
401#define LEADER(state) ((state)->shared && (state)->worker == -1)
402
403/*
404 * NOTES about on-tape representation of tuples:
405 *
406 * We require the first "unsigned int" of a stored tuple to be the total size
407 * on-tape of the tuple, including itself (so it is never zero; an all-zero
408 * unsigned int is used to delimit runs). The remainder of the stored tuple
409 * may or may not match the in-memory representation of the tuple ---
410 * any conversion needed is the job of the writetup and readtup routines.
411 *
412 * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
413 * representation of the tuple must be followed by another "unsigned int" that
414 * is a copy of the length --- so the total tape space used is actually
415 * sizeof(unsigned int) more than the stored length value. This allows
416 * read-backwards. When the random access flag was not specified, the
417 * write/read routines may omit the extra length word.
418 *
419 * writetup is expected to write both length words as well as the tuple
420 * data. When readtup is called, the tape is positioned just after the
421 * front length word; readtup must read the tuple data and advance past
422 * the back length word (if present).
423 *
424 * The write/read routines can make use of the tuple description data
425 * stored in the Tuplesortstate record, if needed. They are also expected
426 * to adjust state->availMem by the amount of memory space (not tape space!)
427 * released or consumed. There is no error return from either writetup
428 * or readtup; they should ereport() on failure.
429 *
430 *
431 * NOTES about memory consumption calculations:
432 *
433 * We count space allocated for tuples against the workMem limit, plus
434 * the space used by the variable-size memtuples array. Fixed-size space
435 * is not counted; it's small enough to not be interesting.
436 *
437 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
438 * rather than the originally-requested size. This is important since
439 * palloc can add substantial overhead. It's not a complete answer since
440 * we won't count any wasted space in palloc allocation blocks, but it's
441 * a lot better than what we were doing before 7.3. As of 9.6, a
442 * separate memory context is used for caller passed tuples. Resetting
443 * it at certain key increments significantly ameliorates fragmentation.
444 * readtup routines use the slab allocator (they cannot use
445 * the reset context because it gets deleted at the point that merging
446 * begins).
447 */
448
449
452static void inittapes(Tuplesortstate *state, bool mergeruns);
453static void inittapestate(Tuplesortstate *state, int maxTapes);
456static void mergeruns(Tuplesortstate *state);
457static void mergeonerun(Tuplesortstate *state);
458static void beginmerge(Tuplesortstate *state);
460static void dumptuples(Tuplesortstate *state, bool alltuples);
468static unsigned int getlen(LogicalTape *tape, bool eofOK);
469static void markrunend(LogicalTape *tape);
477
478/*
479 * Specialized comparators that we can inline into specialized sorts. The goal
480 * is to try to sort two tuples without having to follow the pointers to the
481 * comparator or the tuple.
482 *
483 * XXX: For now, there is no specialization for cases where datum1 is
484 * authoritative and we don't even need to fall back to a callback at all (that
485 * would be true for types like int4/int8/timestamp/date, but not true for
486 * abbreviations of text or multi-key sorts. There could be! Is it worth it?
487 */
488
489/* Used if first key's comparator is ssup_datum_unsigned_cmp */
492{
493 int compare;
494
495 compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
496 b->datum1, b->isnull1,
497 &state->base.sortKeys[0]);
498 if (compare != 0)
499 return compare;
500
501 /*
502 * No need to waste effort calling the tiebreak function when there are no
503 * other keys to sort on.
504 */
505 if (state->base.onlyKey != NULL)
506 return 0;
507
508 return state->base.comparetup_tiebreak(a, b, state);
509}
510
511/* Used if first key's comparator is ssup_datum_signed_cmp */
514{
515 int compare;
516
517 compare = ApplySignedSortComparator(a->datum1, a->isnull1,
518 b->datum1, b->isnull1,
519 &state->base.sortKeys[0]);
520
521 if (compare != 0)
522 return compare;
523
524 /*
525 * No need to waste effort calling the tiebreak function when there are no
526 * other keys to sort on.
527 */
528 if (state->base.onlyKey != NULL)
529 return 0;
530
531 return state->base.comparetup_tiebreak(a, b, state);
532}
533
534/* Used if first key's comparator is ssup_datum_int32_cmp */
537{
538 int compare;
539
540 compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
541 b->datum1, b->isnull1,
542 &state->base.sortKeys[0]);
543
544 if (compare != 0)
545 return compare;
546
547 /*
548 * No need to waste effort calling the tiebreak function when there are no
549 * other keys to sort on.
550 */
551 if (state->base.onlyKey != NULL)
552 return 0;
553
554 return state->base.comparetup_tiebreak(a, b, state);
555}
556
557/*
558 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
559 * any variant of SortTuples, using the appropriate comparetup function.
560 * qsort_ssup() is specialized for the case where the comparetup function
561 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
562 * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
563 * common comparison functions on pass-by-value leading datums.
564 */
565
566#define ST_SORT qsort_tuple_unsigned
567#define ST_ELEMENT_TYPE SortTuple
568#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
569#define ST_COMPARE_ARG_TYPE Tuplesortstate
570#define ST_CHECK_FOR_INTERRUPTS
571#define ST_SCOPE static
572#define ST_DEFINE
573#include "lib/sort_template.h"
574
575#define ST_SORT qsort_tuple_signed
576#define ST_ELEMENT_TYPE SortTuple
577#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
578#define ST_COMPARE_ARG_TYPE Tuplesortstate
579#define ST_CHECK_FOR_INTERRUPTS
580#define ST_SCOPE static
581#define ST_DEFINE
582#include "lib/sort_template.h"
583
584#define ST_SORT qsort_tuple_int32
585#define ST_ELEMENT_TYPE SortTuple
586#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
587#define ST_COMPARE_ARG_TYPE Tuplesortstate
588#define ST_CHECK_FOR_INTERRUPTS
589#define ST_SCOPE static
590#define ST_DEFINE
591#include "lib/sort_template.h"
592
593#define ST_SORT qsort_tuple
594#define ST_ELEMENT_TYPE SortTuple
595#define ST_COMPARE_RUNTIME_POINTER
596#define ST_COMPARE_ARG_TYPE Tuplesortstate
597#define ST_CHECK_FOR_INTERRUPTS
598#define ST_SCOPE static
599#define ST_DECLARE
600#define ST_DEFINE
601#include "lib/sort_template.h"
602
603#define ST_SORT qsort_ssup
604#define ST_ELEMENT_TYPE SortTuple
605#define ST_COMPARE(a, b, ssup) \
606 ApplySortComparator((a)->datum1, (a)->isnull1, \
607 (b)->datum1, (b)->isnull1, (ssup))
608#define ST_COMPARE_ARG_TYPE SortSupportData
609#define ST_CHECK_FOR_INTERRUPTS
610#define ST_SCOPE static
611#define ST_DEFINE
612#include "lib/sort_template.h"
613
614/*
615 * tuplesort_begin_xxx
616 *
617 * Initialize for a tuple sort operation.
618 *
619 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
620 * zero or more times, then call tuplesort_performsort when all the tuples
621 * have been supplied. After performsort, retrieve the tuples in sorted
622 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
623 * access was requested, rescan, markpos, and restorepos can also be called.)
624 * Call tuplesort_end to terminate the operation and release memory/disk space.
625 *
626 * Each variant of tuplesort_begin has a workMem parameter specifying the
627 * maximum number of kilobytes of RAM to use before spilling data to disk.
628 * (The normal value of this parameter is work_mem, but some callers use
629 * other values.) Each variant also has a sortopt which is a bitmask of
630 * sort options. See TUPLESORT_* definitions in tuplesort.h
631 */
632
635{
637 MemoryContext maincontext;
638 MemoryContext sortcontext;
639 MemoryContext oldcontext;
640
641 /* See leader_takeover_tapes() remarks on random access support */
642 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
643 elog(ERROR, "random access disallowed under parallel sort");
644
645 /*
646 * Memory context surviving tuplesort_reset. This memory context holds
647 * data which is useful to keep while sorting multiple similar batches.
648 */
650 "TupleSort main",
652
653 /*
654 * Create a working memory context for one sort operation. The content of
655 * this context is deleted by tuplesort_reset.
656 */
657 sortcontext = AllocSetContextCreate(maincontext,
658 "TupleSort sort",
660
661 /*
662 * Additionally a working memory context for tuples is setup in
663 * tuplesort_begin_batch.
664 */
665
666 /*
667 * Make the Tuplesortstate within the per-sortstate context. This way, we
668 * don't need a separate pfree() operation for it at shutdown.
669 */
670 oldcontext = MemoryContextSwitchTo(maincontext);
671
673
674 if (trace_sort)
675 pg_rusage_init(&state->ru_start);
676
677 state->base.sortopt = sortopt;
678 state->base.tuples = true;
679 state->abbrevNext = 10;
680
681 /*
682 * workMem is forced to be at least 64KB, the current minimum valid value
683 * for the work_mem GUC. This is a defense against parallel sort callers
684 * that divide out memory among many workers in a way that leaves each
685 * with very little memory.
686 */
687 state->allowedMem = Max(workMem, 64) * (int64) 1024;
688 state->base.sortcontext = sortcontext;
689 state->base.maincontext = maincontext;
690
691 state->memtupsize = INITIAL_MEMTUPSIZE;
692 state->memtuples = NULL;
693
694 /*
695 * After all of the other non-parallel-related state, we setup all of the
696 * state needed for each batch.
697 */
699
700 /*
701 * Initialize parallel-related state based on coordination information
702 * from caller
703 */
704 if (!coordinate)
705 {
706 /* Serial sort */
707 state->shared = NULL;
708 state->worker = -1;
709 state->nParticipants = -1;
710 }
711 else if (coordinate->isWorker)
712 {
713 /* Parallel worker produces exactly one final run from all input */
714 state->shared = coordinate->sharedsort;
716 state->nParticipants = -1;
717 }
718 else
719 {
720 /* Parallel leader state only used for final merge */
721 state->shared = coordinate->sharedsort;
722 state->worker = -1;
723 state->nParticipants = coordinate->nParticipants;
724 Assert(state->nParticipants >= 1);
725 }
726
727 MemoryContextSwitchTo(oldcontext);
728
729 return state;
730}
731
732/*
733 * tuplesort_begin_batch
734 *
735 * Setup, or reset, all state need for processing a new set of tuples with this
736 * sort state. Called both from tuplesort_begin_common (the first time sorting
737 * with this sort state) and tuplesort_reset (for subsequent usages).
738 */
739static void
741{
742 MemoryContext oldcontext;
743
744 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
745
746 /*
747 * Caller tuple (e.g. IndexTuple) memory context.
748 *
749 * A dedicated child context used exclusively for caller passed tuples
750 * eases memory management. Resetting at key points reduces
751 * fragmentation. Note that the memtuples array of SortTuples is allocated
752 * in the parent context, not this context, because there is no need to
753 * free memtuples early. For bounded sorts, tuples may be pfreed in any
754 * order, so we use a regular aset.c context so that it can make use of
755 * free'd memory. When the sort is not bounded, we make use of a bump.c
756 * context as this keeps allocations more compact with less wastage.
757 * Allocations are also slightly more CPU efficient.
758 */
759 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
760 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
761 "Caller tuples",
763 else
764 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
765 "Caller tuples",
767
768
769 state->status = TSS_INITIAL;
770 state->bounded = false;
771 state->boundUsed = false;
772
773 state->availMem = state->allowedMem;
774
775 state->tapeset = NULL;
776
777 state->memtupcount = 0;
778
779 state->growmemtuples = true;
780 state->slabAllocatorUsed = false;
781 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
782 {
783 pfree(state->memtuples);
784 state->memtuples = NULL;
785 state->memtupsize = INITIAL_MEMTUPSIZE;
786 }
787 if (state->memtuples == NULL)
788 {
789 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
791 }
792
793 /* workMem must be large enough for the minimal memtuples array */
794 if (LACKMEM(state))
795 elog(ERROR, "insufficient memory allowed for sort");
796
797 state->currentRun = 0;
798
799 /*
800 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
801 * inittapes(), if needed.
802 */
803
804 state->result_tape = NULL; /* flag that result tape has not been formed */
805
806 MemoryContextSwitchTo(oldcontext);
807}
808
809/*
810 * tuplesort_set_bound
811 *
812 * Advise tuplesort that at most the first N result tuples are required.
813 *
814 * Must be called before inserting any tuples. (Actually, we could allow it
815 * as long as the sort hasn't spilled to disk, but there seems no need for
816 * delayed calls at the moment.)
817 *
818 * This is a hint only. The tuplesort may still return more tuples than
819 * requested. Parallel leader tuplesorts will always ignore the hint.
820 */
821void
823{
824 /* Assert we're called before loading any tuples */
825 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
826 /* Assert we allow bounded sorts */
827 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
828 /* Can't set the bound twice, either */
829 Assert(!state->bounded);
830 /* Also, this shouldn't be called in a parallel worker */
832
833 /* Parallel leader allows but ignores hint */
834 if (LEADER(state))
835 return;
836
837#ifdef DEBUG_BOUNDED_SORT
838 /* Honor GUC setting that disables the feature (for easy testing) */
840 return;
841#endif
842
843 /* We want to be able to compute bound * 2, so limit the setting */
844 if (bound > (int64) (INT_MAX / 2))
845 return;
846
847 state->bounded = true;
848 state->bound = (int) bound;
849
850 /*
851 * Bounded sorts are not an effective target for abbreviated key
852 * optimization. Disable by setting state to be consistent with no
853 * abbreviation support.
854 */
855 state->base.sortKeys->abbrev_converter = NULL;
856 if (state->base.sortKeys->abbrev_full_comparator)
857 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
858
859 /* Not strictly necessary, but be tidy */
860 state->base.sortKeys->abbrev_abort = NULL;
861 state->base.sortKeys->abbrev_full_comparator = NULL;
862}
863
864/*
865 * tuplesort_used_bound
866 *
867 * Allow callers to find out if the sort state was able to use a bound.
868 */
869bool
871{
872 return state->boundUsed;
873}
874
875/*
876 * tuplesort_free
877 *
878 * Internal routine for freeing resources of tuplesort.
879 */
880static void
882{
883 /* context swap probably not needed, but let's be safe */
884 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
885 int64 spaceUsed;
886
887 if (state->tapeset)
888 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
889 else
890 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
891
892 /*
893 * Delete temporary "tape" files, if any.
894 *
895 * We don't bother to destroy the individual tapes here. They will go away
896 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
897 * finished tapes already.)
898 */
899 if (state->tapeset)
900 LogicalTapeSetClose(state->tapeset);
901
902 if (trace_sort)
903 {
904 if (state->tapeset)
905 elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
906 SERIAL(state) ? "external sort" : "parallel external sort",
907 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
908 else
909 elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
910 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
911 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
912 }
913
914 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
915
917 MemoryContextSwitchTo(oldcontext);
918
919 /*
920 * Free the per-sort memory context, thereby releasing all working memory.
921 */
922 MemoryContextReset(state->base.sortcontext);
923}
924
925/*
926 * tuplesort_end
927 *
928 * Release resources and clean up.
929 *
930 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
931 * pointing to garbage. Be careful not to attempt to use or free such
932 * pointers afterwards!
933 */
934void
936{
938
939 /*
940 * Free the main memory context, including the Tuplesortstate struct
941 * itself.
942 */
943 MemoryContextDelete(state->base.maincontext);
944}
945
946/*
947 * tuplesort_updatemax
948 *
949 * Update maximum resource usage statistics.
950 */
951static void
953{
954 int64 spaceUsed;
955 bool isSpaceDisk;
956
957 /*
958 * Note: it might seem we should provide both memory and disk usage for a
959 * disk-based sort. However, the current code doesn't track memory space
960 * accurately once we have begun to return tuples to the caller (since we
961 * don't account for pfree's the caller is expected to do), so we cannot
962 * rely on availMem in a disk sort. This does not seem worth the overhead
963 * to fix. Is it worth creating an API for the memory context code to
964 * tell us how much is actually used in sortcontext?
965 */
966 if (state->tapeset)
967 {
968 isSpaceDisk = true;
969 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
970 }
971 else
972 {
973 isSpaceDisk = false;
974 spaceUsed = state->allowedMem - state->availMem;
975 }
976
977 /*
978 * Sort evicts data to the disk when it wasn't able to fit that data into
979 * main memory. This is why we assume space used on the disk to be more
980 * important for tracking resource usage than space used in memory. Note
981 * that the amount of space occupied by some tupleset on the disk might be
982 * less than amount of space occupied by the same tupleset in memory due
983 * to more compact representation.
984 */
985 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
986 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
987 {
988 state->maxSpace = spaceUsed;
989 state->isMaxSpaceDisk = isSpaceDisk;
990 state->maxSpaceStatus = state->status;
991 }
992}
993
994/*
995 * tuplesort_reset
996 *
997 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
998 * meta-information in. After tuplesort_reset, tuplesort is ready to start
999 * a new sort. This allows avoiding recreation of tuple sort states (and
1000 * save resources) when sorting multiple small batches.
1001 */
1002void
1004{
1007
1008 /*
1009 * After we've freed up per-batch memory, re-setup all of the state common
1010 * to both the first batch and any subsequent batch.
1011 */
1013
1014 state->lastReturnedTuple = NULL;
1015 state->slabMemoryBegin = NULL;
1016 state->slabMemoryEnd = NULL;
1017 state->slabFreeHead = NULL;
1018}
1019
1020/*
1021 * Grow the memtuples[] array, if possible within our memory constraint. We
1022 * must not exceed INT_MAX tuples in memory or the caller-provided memory
1023 * limit. Return true if we were able to enlarge the array, false if not.
1024 *
1025 * Normally, at each increment we double the size of the array. When doing
1026 * that would exceed a limit, we attempt one last, smaller increase (and then
1027 * clear the growmemtuples flag so we don't try any more). That allows us to
1028 * use memory as fully as permitted; sticking to the pure doubling rule could
1029 * result in almost half going unused. Because availMem moves around with
1030 * tuple addition/removal, we need some rule to prevent making repeated small
1031 * increases in memtupsize, which would just be useless thrashing. The
1032 * growmemtuples flag accomplishes that and also prevents useless
1033 * recalculations in this function.
1034 */
1035static bool
1037{
1038 int newmemtupsize;
1039 int memtupsize = state->memtupsize;
1040 int64 memNowUsed = state->allowedMem - state->availMem;
1041
1042 /* Forget it if we've already maxed out memtuples, per comment above */
1043 if (!state->growmemtuples)
1044 return false;
1045
1046 /* Select new value of memtupsize */
1047 if (memNowUsed <= state->availMem)
1048 {
1049 /*
1050 * We've used no more than half of allowedMem; double our usage,
1051 * clamping at INT_MAX tuples.
1052 */
1053 if (memtupsize < INT_MAX / 2)
1054 newmemtupsize = memtupsize * 2;
1055 else
1056 {
1058 state->growmemtuples = false;
1059 }
1060 }
1061 else
1062 {
1063 /*
1064 * This will be the last increment of memtupsize. Abandon doubling
1065 * strategy and instead increase as much as we safely can.
1066 *
1067 * To stay within allowedMem, we can't increase memtupsize by more
1068 * than availMem / sizeof(SortTuple) elements. In practice, we want
1069 * to increase it by considerably less, because we need to leave some
1070 * space for the tuples to which the new array slots will refer. We
1071 * assume the new tuples will be about the same size as the tuples
1072 * we've already seen, and thus we can extrapolate from the space
1073 * consumption so far to estimate an appropriate new size for the
1074 * memtuples array. The optimal value might be higher or lower than
1075 * this estimate, but it's hard to know that in advance. We again
1076 * clamp at INT_MAX tuples.
1077 *
1078 * This calculation is safe against enlarging the array so much that
1079 * LACKMEM becomes true, because the memory currently used includes
1080 * the present array; thus, there would be enough allowedMem for the
1081 * new array elements even if no other memory were currently used.
1082 *
1083 * We do the arithmetic in float8, because otherwise the product of
1084 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1085 * result should be insignificant; but even if we computed a
1086 * completely insane result, the checks below will prevent anything
1087 * really bad from happening.
1088 */
1089 double grow_ratio;
1090
1091 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1092 if (memtupsize * grow_ratio < INT_MAX)
1093 newmemtupsize = (int) (memtupsize * grow_ratio);
1094 else
1096
1097 /* We won't make any further enlargement attempts */
1098 state->growmemtuples = false;
1099 }
1100
1101 /* Must enlarge array by at least one element, else report failure */
1102 if (newmemtupsize <= memtupsize)
1103 goto noalloc;
1104
1105 /*
1106 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1107 * to ensure our request won't be rejected. Note that we can easily
1108 * exhaust address space before facing this outcome. (This is presently
1109 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1110 * don't rely on that at this distance.)
1111 */
1112 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1113 {
1115 state->growmemtuples = false; /* can't grow any more */
1116 }
1117
1118 /*
1119 * We need to be sure that we do not cause LACKMEM to become true, else
1120 * the space management algorithm will go nuts. The code above should
1121 * never generate a dangerous request, but to be safe, check explicitly
1122 * that the array growth fits within availMem. (We could still cause
1123 * LACKMEM if the memory chunk overhead associated with the memtuples
1124 * array were to increase. That shouldn't happen because we chose the
1125 * initial array size large enough to ensure that palloc will be treating
1126 * both old and new arrays as separate chunks. But we'll check LACKMEM
1127 * explicitly below just in case.)
1128 */
1129 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1130 goto noalloc;
1131
1132 /* OK, do it */
1133 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1134 state->memtupsize = newmemtupsize;
1135 state->memtuples = (SortTuple *)
1136 repalloc_huge(state->memtuples,
1137 state->memtupsize * sizeof(SortTuple));
1138 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1139 if (LACKMEM(state))
1140 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1141 return true;
1142
1143noalloc:
1144 /* If for any reason we didn't realloc, shut off future attempts */
1145 state->growmemtuples = false;
1146 return false;
1147}
1148
1149/*
1150 * Shared code for tuple and datum cases.
1151 */
1152void
1154 bool useAbbrev, Size tuplen)
1155{
1156 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1157
1158 Assert(!LEADER(state));
1159
1160 /* account for the memory used for this tuple */
1161 USEMEM(state, tuplen);
1162 state->tupleMem += tuplen;
1163
1164 if (!useAbbrev)
1165 {
1166 /*
1167 * Leave ordinary Datum representation, or NULL value. If there is a
1168 * converter it won't expect NULL values, and cost model is not
1169 * required to account for NULL, so in that case we avoid calling
1170 * converter and just set datum1 to zeroed representation (to be
1171 * consistent, and to support cheap inequality tests for NULL
1172 * abbreviated keys).
1173 */
1174 }
1175 else if (!consider_abort_common(state))
1176 {
1177 /* Store abbreviated key representation */
1178 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1179 state->base.sortKeys);
1180 }
1181 else
1182 {
1183 /*
1184 * Set state to be consistent with never trying abbreviation.
1185 *
1186 * Alter datum1 representation in already-copied tuples, so as to
1187 * ensure a consistent representation (current tuple was just
1188 * handled). It does not matter if some dumped tuples are already
1189 * sorted on tape, since serialized tuples lack abbreviated keys
1190 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1191 */
1192 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1193 }
1194
1195 switch (state->status)
1196 {
1197 case TSS_INITIAL:
1198
1199 /*
1200 * Save the tuple into the unsorted array. First, grow the array
1201 * as needed. Note that we try to grow the array when there is
1202 * still one free slot remaining --- if we fail, there'll still be
1203 * room to store the incoming tuple, and then we'll switch to
1204 * tape-based operation.
1205 */
1206 if (state->memtupcount >= state->memtupsize - 1)
1207 {
1209 Assert(state->memtupcount < state->memtupsize);
1210 }
1211 state->memtuples[state->memtupcount++] = *tuple;
1212
1213 /*
1214 * Check if it's time to switch over to a bounded heapsort. We do
1215 * so if the input tuple count exceeds twice the desired tuple
1216 * count (this is a heuristic for where heapsort becomes cheaper
1217 * than a quicksort), or if we've just filled workMem and have
1218 * enough tuples to meet the bound.
1219 *
1220 * Note that once we enter TSS_BOUNDED state we will always try to
1221 * complete the sort that way. In the worst case, if later input
1222 * tuples are larger than earlier ones, this might cause us to
1223 * exceed workMem significantly.
1224 */
1225 if (state->bounded &&
1226 (state->memtupcount > state->bound * 2 ||
1227 (state->memtupcount > state->bound && LACKMEM(state))))
1228 {
1229 if (trace_sort)
1230 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1231 state->memtupcount,
1232 pg_rusage_show(&state->ru_start));
1234 MemoryContextSwitchTo(oldcontext);
1235 return;
1236 }
1237
1238 /*
1239 * Done if we still fit in available memory and have array slots.
1240 */
1241 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1242 {
1243 MemoryContextSwitchTo(oldcontext);
1244 return;
1245 }
1246
1247 /*
1248 * Nope; time to switch to tape-based operation.
1249 */
1250 inittapes(state, true);
1251
1252 /*
1253 * Dump all tuples.
1254 */
1255 dumptuples(state, false);
1256 break;
1257
1258 case TSS_BOUNDED:
1259
1260 /*
1261 * We don't want to grow the array here, so check whether the new
1262 * tuple can be discarded before putting it in. This should be a
1263 * good speed optimization, too, since when there are many more
1264 * input tuples than the bound, most input tuples can be discarded
1265 * with just this one comparison. Note that because we currently
1266 * have the sort direction reversed, we must check for <= not >=.
1267 */
1268 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1269 {
1270 /* new tuple <= top of the heap, so we can discard it */
1271 free_sort_tuple(state, tuple);
1273 }
1274 else
1275 {
1276 /* discard top of heap, replacing it with the new tuple */
1277 free_sort_tuple(state, &state->memtuples[0]);
1279 }
1280 break;
1281
1282 case TSS_BUILDRUNS:
1283
1284 /*
1285 * Save the tuple into the unsorted array (there must be space)
1286 */
1287 state->memtuples[state->memtupcount++] = *tuple;
1288
1289 /*
1290 * If we are over the memory limit, dump all tuples.
1291 */
1292 dumptuples(state, false);
1293 break;
1294
1295 default:
1296 elog(ERROR, "invalid tuplesort state");
1297 break;
1298 }
1299 MemoryContextSwitchTo(oldcontext);
1300}
1301
1302static bool
1304{
1305 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1306 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1307 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1308
1309 /*
1310 * Check effectiveness of abbreviation optimization. Consider aborting
1311 * when still within memory limit.
1312 */
1313 if (state->status == TSS_INITIAL &&
1314 state->memtupcount >= state->abbrevNext)
1315 {
1316 state->abbrevNext *= 2;
1317
1318 /*
1319 * Check opclass-supplied abbreviation abort routine. It may indicate
1320 * that abbreviation should not proceed.
1321 */
1322 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1323 state->base.sortKeys))
1324 return false;
1325
1326 /*
1327 * Finally, restore authoritative comparator, and indicate that
1328 * abbreviation is not in play by setting abbrev_converter to NULL
1329 */
1330 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1331 state->base.sortKeys[0].abbrev_converter = NULL;
1332 /* Not strictly necessary, but be tidy */
1333 state->base.sortKeys[0].abbrev_abort = NULL;
1334 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1335
1336 /* Give up - expect original pass-by-value representation */
1337 return true;
1338 }
1339
1340 return false;
1341}
1342
1343/*
1344 * All tuples have been provided; finish the sort.
1345 */
1346void
1348{
1349 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1350
1351 if (trace_sort)
1352 elog(LOG, "performsort of worker %d starting: %s",
1353 state->worker, pg_rusage_show(&state->ru_start));
1354
1355 switch (state->status)
1356 {
1357 case TSS_INITIAL:
1358
1359 /*
1360 * We were able to accumulate all the tuples within the allowed
1361 * amount of memory, or leader to take over worker tapes
1362 */
1363 if (SERIAL(state))
1364 {
1365 /* Just qsort 'em and we're done */
1367 state->status = TSS_SORTEDINMEM;
1368 }
1369 else if (WORKER(state))
1370 {
1371 /*
1372 * Parallel workers must still dump out tuples to tape. No
1373 * merge is required to produce single output run, though.
1374 */
1375 inittapes(state, false);
1376 dumptuples(state, true);
1378 state->status = TSS_SORTEDONTAPE;
1379 }
1380 else
1381 {
1382 /*
1383 * Leader will take over worker tapes and merge worker runs.
1384 * Note that mergeruns sets the correct state->status.
1385 */
1388 }
1389 state->current = 0;
1390 state->eof_reached = false;
1391 state->markpos_block = 0L;
1392 state->markpos_offset = 0;
1393 state->markpos_eof = false;
1394 break;
1395
1396 case TSS_BOUNDED:
1397
1398 /*
1399 * We were able to accumulate all the tuples required for output
1400 * in memory, using a heap to eliminate excess tuples. Now we
1401 * have to transform the heap to a properly-sorted array. Note
1402 * that sort_bounded_heap sets the correct state->status.
1403 */
1405 state->current = 0;
1406 state->eof_reached = false;
1407 state->markpos_offset = 0;
1408 state->markpos_eof = false;
1409 break;
1410
1411 case TSS_BUILDRUNS:
1412
1413 /*
1414 * Finish tape-based sort. First, flush all tuples remaining in
1415 * memory out to tape; then merge until we have a single remaining
1416 * run (or, if !randomAccess and !WORKER(), one run per tape).
1417 * Note that mergeruns sets the correct state->status.
1418 */
1419 dumptuples(state, true);
1421 state->eof_reached = false;
1422 state->markpos_block = 0L;
1423 state->markpos_offset = 0;
1424 state->markpos_eof = false;
1425 break;
1426
1427 default:
1428 elog(ERROR, "invalid tuplesort state");
1429 break;
1430 }
1431
1432 if (trace_sort)
1433 {
1434 if (state->status == TSS_FINALMERGE)
1435 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1436 state->worker, state->nInputTapes,
1437 pg_rusage_show(&state->ru_start));
1438 else
1439 elog(LOG, "performsort of worker %d done: %s",
1440 state->worker, pg_rusage_show(&state->ru_start));
1441 }
1442
1443 MemoryContextSwitchTo(oldcontext);
1444}
1445
1446/*
1447 * Internal routine to fetch the next tuple in either forward or back
1448 * direction into *stup. Returns false if no more tuples.
1449 * Returned tuple belongs to tuplesort memory context, and must not be freed
1450 * by caller. Note that fetched tuple is stored in memory that may be
1451 * recycled by any future fetch.
1452 */
1453bool
1455 SortTuple *stup)
1456{
1457 unsigned int tuplen;
1458 size_t nmoved;
1459
1460 Assert(!WORKER(state));
1461
1462 switch (state->status)
1463 {
1464 case TSS_SORTEDINMEM:
1465 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1466 Assert(!state->slabAllocatorUsed);
1467 if (forward)
1468 {
1469 if (state->current < state->memtupcount)
1470 {
1471 *stup = state->memtuples[state->current++];
1472 return true;
1473 }
1474 state->eof_reached = true;
1475
1476 /*
1477 * Complain if caller tries to retrieve more tuples than
1478 * originally asked for in a bounded sort. This is because
1479 * returning EOF here might be the wrong thing.
1480 */
1481 if (state->bounded && state->current >= state->bound)
1482 elog(ERROR, "retrieved too many tuples in a bounded sort");
1483
1484 return false;
1485 }
1486 else
1487 {
1488 if (state->current <= 0)
1489 return false;
1490
1491 /*
1492 * if all tuples are fetched already then we return last
1493 * tuple, else - tuple before last returned.
1494 */
1495 if (state->eof_reached)
1496 state->eof_reached = false;
1497 else
1498 {
1499 state->current--; /* last returned tuple */
1500 if (state->current <= 0)
1501 return false;
1502 }
1503 *stup = state->memtuples[state->current - 1];
1504 return true;
1505 }
1506 break;
1507
1508 case TSS_SORTEDONTAPE:
1509 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1510 Assert(state->slabAllocatorUsed);
1511
1512 /*
1513 * The slot that held the tuple that we returned in previous
1514 * gettuple call can now be reused.
1515 */
1516 if (state->lastReturnedTuple)
1517 {
1518 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1519 state->lastReturnedTuple = NULL;
1520 }
1521
1522 if (forward)
1523 {
1524 if (state->eof_reached)
1525 return false;
1526
1527 if ((tuplen = getlen(state->result_tape, true)) != 0)
1528 {
1529 READTUP(state, stup, state->result_tape, tuplen);
1530
1531 /*
1532 * Remember the tuple we return, so that we can recycle
1533 * its memory on next call. (This can be NULL, in the
1534 * !state->tuples case).
1535 */
1536 state->lastReturnedTuple = stup->tuple;
1537
1538 return true;
1539 }
1540 else
1541 {
1542 state->eof_reached = true;
1543 return false;
1544 }
1545 }
1546
1547 /*
1548 * Backward.
1549 *
1550 * if all tuples are fetched already then we return last tuple,
1551 * else - tuple before last returned.
1552 */
1553 if (state->eof_reached)
1554 {
1555 /*
1556 * Seek position is pointing just past the zero tuplen at the
1557 * end of file; back up to fetch last tuple's ending length
1558 * word. If seek fails we must have a completely empty file.
1559 */
1560 nmoved = LogicalTapeBackspace(state->result_tape,
1561 2 * sizeof(unsigned int));
1562 if (nmoved == 0)
1563 return false;
1564 else if (nmoved != 2 * sizeof(unsigned int))
1565 elog(ERROR, "unexpected tape position");
1566 state->eof_reached = false;
1567 }
1568 else
1569 {
1570 /*
1571 * Back up and fetch previously-returned tuple's ending length
1572 * word. If seek fails, assume we are at start of file.
1573 */
1574 nmoved = LogicalTapeBackspace(state->result_tape,
1575 sizeof(unsigned int));
1576 if (nmoved == 0)
1577 return false;
1578 else if (nmoved != sizeof(unsigned int))
1579 elog(ERROR, "unexpected tape position");
1580 tuplen = getlen(state->result_tape, false);
1581
1582 /*
1583 * Back up to get ending length word of tuple before it.
1584 */
1585 nmoved = LogicalTapeBackspace(state->result_tape,
1586 tuplen + 2 * sizeof(unsigned int));
1587 if (nmoved == tuplen + sizeof(unsigned int))
1588 {
1589 /*
1590 * We backed up over the previous tuple, but there was no
1591 * ending length word before it. That means that the prev
1592 * tuple is the first tuple in the file. It is now the
1593 * next to read in forward direction (not obviously right,
1594 * but that is what in-memory case does).
1595 */
1596 return false;
1597 }
1598 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1599 elog(ERROR, "bogus tuple length in backward scan");
1600 }
1601
1602 tuplen = getlen(state->result_tape, false);
1603
1604 /*
1605 * Now we have the length of the prior tuple, back up and read it.
1606 * Note: READTUP expects we are positioned after the initial
1607 * length word of the tuple, so back up to that point.
1608 */
1609 nmoved = LogicalTapeBackspace(state->result_tape,
1610 tuplen);
1611 if (nmoved != tuplen)
1612 elog(ERROR, "bogus tuple length in backward scan");
1613 READTUP(state, stup, state->result_tape, tuplen);
1614
1615 /*
1616 * Remember the tuple we return, so that we can recycle its memory
1617 * on next call. (This can be NULL, in the Datum case).
1618 */
1619 state->lastReturnedTuple = stup->tuple;
1620
1621 return true;
1622
1623 case TSS_FINALMERGE:
1624 Assert(forward);
1625 /* We are managing memory ourselves, with the slab allocator. */
1626 Assert(state->slabAllocatorUsed);
1627
1628 /*
1629 * The slab slot holding the tuple that we returned in previous
1630 * gettuple call can now be reused.
1631 */
1632 if (state->lastReturnedTuple)
1633 {
1634 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1635 state->lastReturnedTuple = NULL;
1636 }
1637
1638 /*
1639 * This code should match the inner loop of mergeonerun().
1640 */
1641 if (state->memtupcount > 0)
1642 {
1643 int srcTapeIndex = state->memtuples[0].srctape;
1644 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1646
1647 *stup = state->memtuples[0];
1648
1649 /*
1650 * Remember the tuple we return, so that we can recycle its
1651 * memory on next call. (This can be NULL, in the Datum case).
1652 */
1653 state->lastReturnedTuple = stup->tuple;
1654
1655 /*
1656 * Pull next tuple from tape, and replace the returned tuple
1657 * at top of the heap with it.
1658 */
1660 {
1661 /*
1662 * If no more data, we've reached end of run on this tape.
1663 * Remove the top node from the heap.
1664 */
1666 state->nInputRuns--;
1667
1668 /*
1669 * Close the tape. It'd go away at the end of the sort
1670 * anyway, but better to release the memory early.
1671 */
1673 return true;
1674 }
1675 newtup.srctape = srcTapeIndex;
1677 return true;
1678 }
1679 return false;
1680
1681 default:
1682 elog(ERROR, "invalid tuplesort state");
1683 return false; /* keep compiler quiet */
1684 }
1685}
1686
1687
1688/*
1689 * Advance over N tuples in either forward or back direction,
1690 * without returning any data. N==0 is a no-op.
1691 * Returns true if successful, false if ran out of tuples.
1692 */
1693bool
1695{
1696 MemoryContext oldcontext;
1697
1698 /*
1699 * We don't actually support backwards skip yet, because no callers need
1700 * it. The API is designed to allow for that later, though.
1701 */
1702 Assert(forward);
1703 Assert(ntuples >= 0);
1704 Assert(!WORKER(state));
1705
1706 switch (state->status)
1707 {
1708 case TSS_SORTEDINMEM:
1709 if (state->memtupcount - state->current >= ntuples)
1710 {
1711 state->current += ntuples;
1712 return true;
1713 }
1714 state->current = state->memtupcount;
1715 state->eof_reached = true;
1716
1717 /*
1718 * Complain if caller tries to retrieve more tuples than
1719 * originally asked for in a bounded sort. This is because
1720 * returning EOF here might be the wrong thing.
1721 */
1722 if (state->bounded && state->current >= state->bound)
1723 elog(ERROR, "retrieved too many tuples in a bounded sort");
1724
1725 return false;
1726
1727 case TSS_SORTEDONTAPE:
1728 case TSS_FINALMERGE:
1729
1730 /*
1731 * We could probably optimize these cases better, but for now it's
1732 * not worth the trouble.
1733 */
1734 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1735 while (ntuples-- > 0)
1736 {
1738
1740 {
1741 MemoryContextSwitchTo(oldcontext);
1742 return false;
1743 }
1745 }
1746 MemoryContextSwitchTo(oldcontext);
1747 return true;
1748
1749 default:
1750 elog(ERROR, "invalid tuplesort state");
1751 return false; /* keep compiler quiet */
1752 }
1753}
1754
1755/*
1756 * tuplesort_merge_order - report merge order we'll use for given memory
1757 * (note: "merge order" just means the number of input tapes in the merge).
1758 *
1759 * This is exported for use by the planner. allowedMem is in bytes.
1760 */
1761int
1762tuplesort_merge_order(int64 allowedMem)
1763{
1764 int mOrder;
1765
1766 /*----------
1767 * In the merge phase, we need buffer space for each input and output tape.
1768 * Each pass in the balanced merge algorithm reads from M input tapes, and
1769 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1770 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1771 * input tape.
1772 *
1773 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1774 * N * TAPE_BUFFER_OVERHEAD
1775 *
1776 * Except for the last and next-to-last merge passes, where there can be
1777 * fewer tapes left to process, M = N. We choose M so that we have the
1778 * desired amount of memory available for the input buffers
1779 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1780 * available for the tape buffers (allowedMem).
1781 *
1782 * Note: you might be thinking we need to account for the memtuples[]
1783 * array in this calculation, but we effectively treat that as part of the
1784 * MERGE_BUFFER_SIZE workspace.
1785 *----------
1786 */
1787 mOrder = allowedMem /
1789
1790 /*
1791 * Even in minimum memory, use at least a MINORDER merge. On the other
1792 * hand, even when we have lots of memory, do not use more than a MAXORDER
1793 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1794 * additional tape reduces the amount of memory available to build runs,
1795 * which in turn can cause the same sort to need more runs, which makes
1796 * merging slower even if it can still be done in a single pass. Also,
1797 * high order merges are quite slow due to CPU cache effects; it can be
1798 * faster to pay the I/O cost of a multi-pass merge than to perform a
1799 * single merge pass across many hundreds of tapes.
1800 */
1803
1804 return mOrder;
1805}
1806
1807/*
1808 * Helper function to calculate how much memory to allocate for the read buffer
1809 * of each input tape in a merge pass.
1810 *
1811 * 'avail_mem' is the amount of memory available for the buffers of all the
1812 * tapes, both input and output.
1813 * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1814 * 'maxOutputTapes' is the max. number of output tapes we should produce.
1815 */
1816static int64
1817merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1818 int maxOutputTapes)
1819{
1820 int nOutputRuns;
1821 int nOutputTapes;
1822
1823 /*
1824 * How many output tapes will we produce in this pass?
1825 *
1826 * This is nInputRuns / nInputTapes, rounded up.
1827 */
1828 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1829
1830 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1831
1832 /*
1833 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1834 * remaining memory is divided evenly between the input tapes.
1835 *
1836 * This also follows from the formula in tuplesort_merge_order, but here
1837 * we derive the input buffer size from the amount of memory available,
1838 * and M and N.
1839 */
1840 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1841}
1842
1843/*
1844 * inittapes - initialize for tape sorting.
1845 *
1846 * This is called only if we have found we won't sort in memory.
1847 */
1848static void
1850{
1851 Assert(!LEADER(state));
1852
1853 if (mergeruns)
1854 {
1855 /* Compute number of input tapes to use when merging */
1856 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1857 }
1858 else
1859 {
1860 /* Workers can sometimes produce single run, output without merge */
1862 state->maxTapes = MINORDER;
1863 }
1864
1865 if (trace_sort)
1866 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1867 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1868
1869 /* Create the tape set */
1870 inittapestate(state, state->maxTapes);
1871 state->tapeset =
1873 state->shared ? &state->shared->fileset : NULL,
1874 state->worker);
1875
1876 state->currentRun = 0;
1877
1878 /*
1879 * Initialize logical tape arrays.
1880 */
1881 state->inputTapes = NULL;
1882 state->nInputTapes = 0;
1883 state->nInputRuns = 0;
1884
1885 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1886 state->nOutputTapes = 0;
1887 state->nOutputRuns = 0;
1888
1889 state->status = TSS_BUILDRUNS;
1890
1892}
1893
1894/*
1895 * inittapestate - initialize generic tape management state
1896 */
1897static void
1898inittapestate(Tuplesortstate *state, int maxTapes)
1899{
1901
1902 /*
1903 * Decrease availMem to reflect the space needed for tape buffers; but
1904 * don't decrease it to the point that we have no room for tuples. (That
1905 * case is only likely to occur if sorting pass-by-value Datums; in all
1906 * other scenarios the memtuples[] array is unlikely to occupy more than
1907 * half of allowedMem. In the pass-by-value case it's not important to
1908 * account for tuple space, so we don't care if LACKMEM becomes
1909 * inaccurate.)
1910 */
1911 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1912
1913 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1915
1916 /*
1917 * Make sure that the temp file(s) underlying the tape set are created in
1918 * suitable temp tablespaces. For parallel sorts, this should have been
1919 * called already, but it doesn't matter if it is called a second time.
1920 */
1922}
1923
1924/*
1925 * selectnewtape -- select next tape to output to.
1926 *
1927 * This is called after finishing a run when we know another run
1928 * must be started. This is used both when building the initial
1929 * runs, and during merge passes.
1930 */
1931static void
1933{
1934 /*
1935 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1936 * both zero. On each call, we create a new output tape to hold the next
1937 * run, until maxTapes is reached. After that, we assign new runs to the
1938 * existing tapes in a round robin fashion.
1939 */
1940 if (state->nOutputTapes < state->maxTapes)
1941 {
1942 /* Create a new tape to hold the next run */
1943 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1944 Assert(state->nOutputRuns == state->nOutputTapes);
1945 state->destTape = LogicalTapeCreate(state->tapeset);
1946 state->outputTapes[state->nOutputTapes] = state->destTape;
1947 state->nOutputTapes++;
1948 state->nOutputRuns++;
1949 }
1950 else
1951 {
1952 /*
1953 * We have reached the max number of tapes. Append to an existing
1954 * tape.
1955 */
1956 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1957 state->nOutputRuns++;
1958 }
1959}
1960
1961/*
1962 * Initialize the slab allocation arena, for the given number of slots.
1963 */
1964static void
1966{
1967 if (numSlots > 0)
1968 {
1969 char *p;
1970 int i;
1971
1972 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1973 state->slabMemoryEnd = state->slabMemoryBegin +
1975 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1977
1978 p = state->slabMemoryBegin;
1979 for (i = 0; i < numSlots - 1; i++)
1980 {
1981 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1982 p += SLAB_SLOT_SIZE;
1983 }
1984 ((SlabSlot *) p)->nextfree = NULL;
1985 }
1986 else
1987 {
1988 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1989 state->slabFreeHead = NULL;
1990 }
1991 state->slabAllocatorUsed = true;
1992}
1993
1994/*
1995 * mergeruns -- merge all the completed initial runs.
1996 *
1997 * This implements the Balanced k-Way Merge Algorithm. All input data has
1998 * already been written to initial runs on tape (see dumptuples).
1999 */
2000static void
2002{
2003 int tapenum;
2004
2005 Assert(state->status == TSS_BUILDRUNS);
2006 Assert(state->memtupcount == 0);
2007
2008 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2009 {
2010 /*
2011 * If there are multiple runs to be merged, when we go to read back
2012 * tuples from disk, abbreviated keys will not have been stored, and
2013 * we don't care to regenerate them. Disable abbreviation from this
2014 * point on.
2015 */
2016 state->base.sortKeys->abbrev_converter = NULL;
2017 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2018
2019 /* Not strictly necessary, but be tidy */
2020 state->base.sortKeys->abbrev_abort = NULL;
2021 state->base.sortKeys->abbrev_full_comparator = NULL;
2022 }
2023
2024 /*
2025 * Reset tuple memory. We've freed all the tuples that we previously
2026 * allocated. We will use the slab allocator from now on.
2027 */
2028 MemoryContextResetOnly(state->base.tuplecontext);
2029
2030 /*
2031 * We no longer need a large memtuples array. (We will allocate a smaller
2032 * one for the heap later.)
2033 */
2034 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2035 pfree(state->memtuples);
2036 state->memtuples = NULL;
2037
2038 /*
2039 * Initialize the slab allocator. We need one slab slot per input tape,
2040 * for the tuples in the heap, plus one to hold the tuple last returned
2041 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2042 * however, we don't need to do allocate anything.)
2043 *
2044 * In a multi-pass merge, we could shrink this allocation for the last
2045 * merge pass, if it has fewer tapes than previous passes, but we don't
2046 * bother.
2047 *
2048 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2049 * to track memory usage of individual tuples.
2050 */
2051 if (state->base.tuples)
2052 init_slab_allocator(state, state->nOutputTapes + 1);
2053 else
2055
2056 /*
2057 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2058 * from each input tape.
2059 *
2060 * We could shrink this, too, between passes in a multi-pass merge, but we
2061 * don't bother. (The initial input tapes are still in outputTapes. The
2062 * number of input tapes will not increase between passes.)
2063 */
2064 state->memtupsize = state->nOutputTapes;
2065 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2066 state->nOutputTapes * sizeof(SortTuple));
2067 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2068
2069 /*
2070 * Use all the remaining memory we have available for tape buffers among
2071 * all the input tapes. At the beginning of each merge pass, we will
2072 * divide this memory between the input and output tapes in the pass.
2073 */
2074 state->tape_buffer_mem = state->availMem;
2075 USEMEM(state, state->tape_buffer_mem);
2076 if (trace_sort)
2077 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2078 state->worker, state->tape_buffer_mem / 1024);
2079
2080 for (;;)
2081 {
2082 /*
2083 * On the first iteration, or if we have read all the runs from the
2084 * input tapes in a multi-pass merge, it's time to start a new pass.
2085 * Rewind all the output tapes, and make them inputs for the next
2086 * pass.
2087 */
2088 if (state->nInputRuns == 0)
2089 {
2091
2092 /* Close the old, emptied, input tapes */
2093 if (state->nInputTapes > 0)
2094 {
2095 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2096 LogicalTapeClose(state->inputTapes[tapenum]);
2097 pfree(state->inputTapes);
2098 }
2099
2100 /* Previous pass's outputs become next pass's inputs. */
2101 state->inputTapes = state->outputTapes;
2102 state->nInputTapes = state->nOutputTapes;
2103 state->nInputRuns = state->nOutputRuns;
2104
2105 /*
2106 * Reset output tape variables. The actual LogicalTapes will be
2107 * created as needed, here we only allocate the array to hold
2108 * them.
2109 */
2110 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2111 state->nOutputTapes = 0;
2112 state->nOutputRuns = 0;
2113
2114 /*
2115 * Redistribute the memory allocated for tape buffers, among the
2116 * new input and output tapes.
2117 */
2119 state->nInputTapes,
2120 state->nInputRuns,
2121 state->maxTapes);
2122
2123 if (trace_sort)
2124 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2125 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2126 pg_rusage_show(&state->ru_start));
2127
2128 /* Prepare the new input tapes for merge pass. */
2129 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2131
2132 /*
2133 * If there's just one run left on each input tape, then only one
2134 * merge pass remains. If we don't have to produce a materialized
2135 * sorted tape, we can stop at this point and do the final merge
2136 * on-the-fly.
2137 */
2138 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2139 && state->nInputRuns <= state->nInputTapes
2140 && !WORKER(state))
2141 {
2142 /* Tell logtape.c we won't be writing anymore */
2144 /* Initialize for the final merge pass */
2146 state->status = TSS_FINALMERGE;
2147 return;
2148 }
2149 }
2150
2151 /* Select an output tape */
2153
2154 /* Merge one run from each input tape. */
2156
2157 /*
2158 * If the input tapes are empty, and we output only one output run,
2159 * we're done. The current output tape contains the final result.
2160 */
2161 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2162 break;
2163 }
2164
2165 /*
2166 * Done. The result is on a single run on a single tape.
2167 */
2168 state->result_tape = state->outputTapes[0];
2169 if (!WORKER(state))
2170 LogicalTapeFreeze(state->result_tape, NULL);
2171 else
2173 state->status = TSS_SORTEDONTAPE;
2174
2175 /* Close all the now-empty input tapes, to release their read buffers. */
2176 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2177 LogicalTapeClose(state->inputTapes[tapenum]);
2178}
2179
2180/*
2181 * Merge one run from each input tape.
2182 */
2183static void
2185{
2186 int srcTapeIndex;
2188
2189 /*
2190 * Start the merge by loading one tuple from each active source tape into
2191 * the heap.
2192 */
2194
2195 Assert(state->slabAllocatorUsed);
2196
2197 /*
2198 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2199 * out, and replacing it with next tuple from same tape (if there is
2200 * another one).
2201 */
2202 while (state->memtupcount > 0)
2203 {
2205
2206 /* write the tuple to destTape */
2207 srcTapeIndex = state->memtuples[0].srctape;
2208 srcTape = state->inputTapes[srcTapeIndex];
2209 WRITETUP(state, state->destTape, &state->memtuples[0]);
2210
2211 /* recycle the slot of the tuple we just wrote out, for the next read */
2212 if (state->memtuples[0].tuple)
2213 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2214
2215 /*
2216 * pull next tuple from the tape, and replace the written-out tuple in
2217 * the heap with it.
2218 */
2220 {
2221 stup.srctape = srcTapeIndex;
2223 }
2224 else
2225 {
2227 state->nInputRuns--;
2228 }
2229 }
2230
2231 /*
2232 * When the heap empties, we're done. Write an end-of-run marker on the
2233 * output tape.
2234 */
2235 markrunend(state->destTape);
2236}
2237
2238/*
2239 * beginmerge - initialize for a merge pass
2240 *
2241 * Fill the merge heap with the first tuple from each input tape.
2242 */
2243static void
2245{
2246 int activeTapes;
2247 int srcTapeIndex;
2248
2249 /* Heap should be empty here */
2250 Assert(state->memtupcount == 0);
2251
2252 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2253
2255 {
2256 SortTuple tup;
2257
2258 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2259 {
2262 }
2263 }
2264}
2265
2266/*
2267 * mergereadnext - read next tuple from one merge input tape
2268 *
2269 * Returns false on EOF.
2270 */
2271static bool
2273{
2274 unsigned int tuplen;
2275
2276 /* read next tuple, if any */
2277 if ((tuplen = getlen(srcTape, true)) == 0)
2278 return false;
2279 READTUP(state, stup, srcTape, tuplen);
2280
2281 return true;
2282}
2283
2284/*
2285 * dumptuples - remove tuples from memtuples and write initial run to tape
2286 *
2287 * When alltuples = true, dump everything currently in memory. (This case is
2288 * only used at end of input data.)
2289 */
2290static void
2292{
2293 int memtupwrite;
2294 int i;
2295
2296 /*
2297 * Nothing to do if we still fit in available memory and have array slots,
2298 * unless this is the final call during initial run generation.
2299 */
2300 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2301 !alltuples)
2302 return;
2303
2304 /*
2305 * Final call might require no sorting, in rare cases where we just so
2306 * happen to have previously LACKMEM()'d at the point where exactly all
2307 * remaining tuples are loaded into memory, just before input was
2308 * exhausted. In general, short final runs are quite possible, but avoid
2309 * creating a completely empty run. In a worker, though, we must produce
2310 * at least one tape, even if it's empty.
2311 */
2312 if (state->memtupcount == 0 && state->currentRun > 0)
2313 return;
2314
2315 Assert(state->status == TSS_BUILDRUNS);
2316
2317 /*
2318 * It seems unlikely that this limit will ever be exceeded, but take no
2319 * chances
2320 */
2321 if (state->currentRun == INT_MAX)
2322 ereport(ERROR,
2324 errmsg("cannot have more than %d runs for an external sort",
2325 INT_MAX)));
2326
2327 if (state->currentRun > 0)
2329
2330 state->currentRun++;
2331
2332 if (trace_sort)
2333 elog(LOG, "worker %d starting quicksort of run %d: %s",
2334 state->worker, state->currentRun,
2335 pg_rusage_show(&state->ru_start));
2336
2337 /*
2338 * Sort all tuples accumulated within the allowed amount of memory for
2339 * this run using quicksort
2340 */
2342
2343 if (trace_sort)
2344 elog(LOG, "worker %d finished quicksort of run %d: %s",
2345 state->worker, state->currentRun,
2346 pg_rusage_show(&state->ru_start));
2347
2348 memtupwrite = state->memtupcount;
2349 for (i = 0; i < memtupwrite; i++)
2350 {
2351 SortTuple *stup = &state->memtuples[i];
2352
2353 WRITETUP(state, state->destTape, stup);
2354 }
2355
2356 state->memtupcount = 0;
2357
2358 /*
2359 * Reset tuple memory. We've freed all of the tuples that we previously
2360 * allocated. It's important to avoid fragmentation when there is a stark
2361 * change in the sizes of incoming tuples. In bounded sorts,
2362 * fragmentation due to AllocSetFree's bucketing by size class might be
2363 * particularly bad if this step wasn't taken.
2364 */
2365 MemoryContextReset(state->base.tuplecontext);
2366
2367 /*
2368 * Now update the memory accounting to subtract the memory used by the
2369 * tuple.
2370 */
2371 FREEMEM(state, state->tupleMem);
2372 state->tupleMem = 0;
2373
2374 markrunend(state->destTape);
2375
2376 if (trace_sort)
2377 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2378 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2379 pg_rusage_show(&state->ru_start));
2380}
2381
2382/*
2383 * tuplesort_rescan - rewind and replay the scan
2384 */
2385void
2387{
2388 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2389
2390 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2391
2392 switch (state->status)
2393 {
2394 case TSS_SORTEDINMEM:
2395 state->current = 0;
2396 state->eof_reached = false;
2397 state->markpos_offset = 0;
2398 state->markpos_eof = false;
2399 break;
2400 case TSS_SORTEDONTAPE:
2401 LogicalTapeRewindForRead(state->result_tape, 0);
2402 state->eof_reached = false;
2403 state->markpos_block = 0L;
2404 state->markpos_offset = 0;
2405 state->markpos_eof = false;
2406 break;
2407 default:
2408 elog(ERROR, "invalid tuplesort state");
2409 break;
2410 }
2411
2412 MemoryContextSwitchTo(oldcontext);
2413}
2414
2415/*
2416 * tuplesort_markpos - saves current position in the merged sort file
2417 */
2418void
2420{
2421 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2422
2423 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2424
2425 switch (state->status)
2426 {
2427 case TSS_SORTEDINMEM:
2428 state->markpos_offset = state->current;
2429 state->markpos_eof = state->eof_reached;
2430 break;
2431 case TSS_SORTEDONTAPE:
2432 LogicalTapeTell(state->result_tape,
2433 &state->markpos_block,
2434 &state->markpos_offset);
2435 state->markpos_eof = state->eof_reached;
2436 break;
2437 default:
2438 elog(ERROR, "invalid tuplesort state");
2439 break;
2440 }
2441
2442 MemoryContextSwitchTo(oldcontext);
2443}
2444
2445/*
2446 * tuplesort_restorepos - restores current position in merged sort file to
2447 * last saved position
2448 */
2449void
2451{
2452 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2453
2454 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2455
2456 switch (state->status)
2457 {
2458 case TSS_SORTEDINMEM:
2459 state->current = state->markpos_offset;
2460 state->eof_reached = state->markpos_eof;
2461 break;
2462 case TSS_SORTEDONTAPE:
2463 LogicalTapeSeek(state->result_tape,
2464 state->markpos_block,
2465 state->markpos_offset);
2466 state->eof_reached = state->markpos_eof;
2467 break;
2468 default:
2469 elog(ERROR, "invalid tuplesort state");
2470 break;
2471 }
2472
2473 MemoryContextSwitchTo(oldcontext);
2474}
2475
2476/*
2477 * tuplesort_get_stats - extract summary statistics
2478 *
2479 * This can be called after tuplesort_performsort() finishes to obtain
2480 * printable summary information about how the sort was performed.
2481 */
2482void
2485{
2486 /*
2487 * Note: it might seem we should provide both memory and disk usage for a
2488 * disk-based sort. However, the current code doesn't track memory space
2489 * accurately once we have begun to return tuples to the caller (since we
2490 * don't account for pfree's the caller is expected to do), so we cannot
2491 * rely on availMem in a disk sort. This does not seem worth the overhead
2492 * to fix. Is it worth creating an API for the memory context code to
2493 * tell us how much is actually used in sortcontext?
2494 */
2496
2497 if (state->isMaxSpaceDisk)
2499 else
2501 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2502
2503 switch (state->maxSpaceStatus)
2504 {
2505 case TSS_SORTEDINMEM:
2506 if (state->boundUsed)
2508 else
2510 break;
2511 case TSS_SORTEDONTAPE:
2513 break;
2514 case TSS_FINALMERGE:
2516 break;
2517 default:
2519 break;
2520 }
2521}
2522
2523/*
2524 * Convert TuplesortMethod to a string.
2525 */
2526const char *
2528{
2529 switch (m)
2530 {
2532 return "still in progress";
2534 return "top-N heapsort";
2536 return "quicksort";
2538 return "external sort";
2540 return "external merge";
2541 }
2542
2543 return "unknown";
2544}
2545
2546/*
2547 * Convert TuplesortSpaceType to a string.
2548 */
2549const char *
2551{
2553 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2554}
2555
2556
2557/*
2558 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2559 */
2560
2561/*
2562 * Convert the existing unordered array of SortTuples to a bounded heap,
2563 * discarding all but the smallest "state->bound" tuples.
2564 *
2565 * When working with a bounded heap, we want to keep the largest entry
2566 * at the root (array entry zero), instead of the smallest as in the normal
2567 * sort case. This allows us to discard the largest entry cheaply.
2568 * Therefore, we temporarily reverse the sort direction.
2569 */
2570static void
2572{
2573 int tupcount = state->memtupcount;
2574 int i;
2575
2576 Assert(state->status == TSS_INITIAL);
2577 Assert(state->bounded);
2578 Assert(tupcount >= state->bound);
2580
2581 /* Reverse sort direction so largest entry will be at root */
2583
2584 state->memtupcount = 0; /* make the heap empty */
2585 for (i = 0; i < tupcount; i++)
2586 {
2587 if (state->memtupcount < state->bound)
2588 {
2589 /* Insert next tuple into heap */
2590 /* Must copy source tuple to avoid possible overwrite */
2591 SortTuple stup = state->memtuples[i];
2592
2594 }
2595 else
2596 {
2597 /*
2598 * The heap is full. Replace the largest entry with the new
2599 * tuple, or just discard it, if it's larger than anything already
2600 * in the heap.
2601 */
2602 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2603 {
2604 free_sort_tuple(state, &state->memtuples[i]);
2606 }
2607 else
2608 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2609 }
2610 }
2611
2612 Assert(state->memtupcount == state->bound);
2613 state->status = TSS_BOUNDED;
2614}
2615
2616/*
2617 * Convert the bounded heap to a properly-sorted array
2618 */
2619static void
2621{
2622 int tupcount = state->memtupcount;
2623
2624 Assert(state->status == TSS_BOUNDED);
2625 Assert(state->bounded);
2626 Assert(tupcount == state->bound);
2628
2629 /*
2630 * We can unheapify in place because each delete-top call will remove the
2631 * largest entry, which we can promptly store in the newly freed slot at
2632 * the end. Once we're down to a single-entry heap, we're done.
2633 */
2634 while (state->memtupcount > 1)
2635 {
2636 SortTuple stup = state->memtuples[0];
2637
2638 /* this sifts-up the next-largest entry and decreases memtupcount */
2640 state->memtuples[state->memtupcount] = stup;
2641 }
2642 state->memtupcount = tupcount;
2643
2644 /*
2645 * Reverse sort direction back to the original state. This is not
2646 * actually necessary but seems like a good idea for tidiness.
2647 */
2649
2650 state->status = TSS_SORTEDINMEM;
2651 state->boundUsed = true;
2652}
2653
2654/*
2655 * Sort all memtuples using specialized qsort() routines.
2656 *
2657 * Quicksort is used for small in-memory sorts, and external sort runs.
2658 */
2659static void
2661{
2662 Assert(!LEADER(state));
2663
2664 if (state->memtupcount > 1)
2665 {
2666 /*
2667 * Do we have the leading column's value or abbreviation in datum1,
2668 * and is there a specialization for its comparator?
2669 */
2670 if (state->base.haveDatum1 && state->base.sortKeys)
2671 {
2672 if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2673 {
2674 qsort_tuple_unsigned(state->memtuples,
2675 state->memtupcount,
2676 state);
2677 return;
2678 }
2679 else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2680 {
2681 qsort_tuple_signed(state->memtuples,
2682 state->memtupcount,
2683 state);
2684 return;
2685 }
2686 else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2687 {
2688 qsort_tuple_int32(state->memtuples,
2689 state->memtupcount,
2690 state);
2691 return;
2692 }
2693 }
2694
2695 /* Can we use the single-key sort function? */
2696 if (state->base.onlyKey != NULL)
2697 {
2698 qsort_ssup(state->memtuples, state->memtupcount,
2699 state->base.onlyKey);
2700 }
2701 else
2702 {
2703 qsort_tuple(state->memtuples,
2704 state->memtupcount,
2705 state->base.comparetup,
2706 state);
2707 }
2708 }
2709}
2710
2711/*
2712 * Insert a new tuple into an empty or existing heap, maintaining the
2713 * heap invariant. Caller is responsible for ensuring there's room.
2714 *
2715 * Note: For some callers, tuple points to a memtuples[] entry above the
2716 * end of the heap. This is safe as long as it's not immediately adjacent
2717 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2718 * is, it might get overwritten before being moved into the heap!
2719 */
2720static void
2722{
2723 SortTuple *memtuples;
2724 int j;
2725
2726 memtuples = state->memtuples;
2727 Assert(state->memtupcount < state->memtupsize);
2728
2730
2731 /*
2732 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2733 * using 1-based array indexes, not 0-based.
2734 */
2735 j = state->memtupcount++;
2736 while (j > 0)
2737 {
2738 int i = (j - 1) >> 1;
2739
2740 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2741 break;
2742 memtuples[j] = memtuples[i];
2743 j = i;
2744 }
2745 memtuples[j] = *tuple;
2746}
2747
2748/*
2749 * Remove the tuple at state->memtuples[0] from the heap. Decrement
2750 * memtupcount, and sift up to maintain the heap invariant.
2751 *
2752 * The caller has already free'd the tuple the top node points to,
2753 * if necessary.
2754 */
2755static void
2757{
2758 SortTuple *memtuples = state->memtuples;
2759 SortTuple *tuple;
2760
2761 if (--state->memtupcount <= 0)
2762 return;
2763
2764 /*
2765 * Remove the last tuple in the heap, and re-insert it, by replacing the
2766 * current top node with it.
2767 */
2768 tuple = &memtuples[state->memtupcount];
2770}
2771
2772/*
2773 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2774 * maintain the heap invariant.
2775 *
2776 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2777 * Heapsort, steps H3-H8).
2778 */
2779static void
2781{
2782 SortTuple *memtuples = state->memtuples;
2783 unsigned int i,
2784 n;
2785
2786 Assert(state->memtupcount >= 1);
2787
2789
2790 /*
2791 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2792 * This prevents overflow in the "2 * i + 1" calculation, since at the top
2793 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2794 */
2795 n = state->memtupcount;
2796 i = 0; /* i is where the "hole" is */
2797 for (;;)
2798 {
2799 unsigned int j = 2 * i + 1;
2800
2801 if (j >= n)
2802 break;
2803 if (j + 1 < n &&
2804 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2805 j++;
2806 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2807 break;
2808 memtuples[i] = memtuples[j];
2809 i = j;
2810 }
2811 memtuples[i] = *tuple;
2812}
2813
2814/*
2815 * Function to reverse the sort direction from its current state
2816 *
2817 * It is not safe to call this when performing hash tuplesorts
2818 */
2819static void
2821{
2822 SortSupport sortKey = state->base.sortKeys;
2823 int nkey;
2824
2825 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2826 {
2827 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2828 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2829 }
2830}
2831
2832
2833/*
2834 * Tape interface routines
2835 */
2836
2837static unsigned int
2839{
2840 unsigned int len;
2841
2843 &len, sizeof(len)) != sizeof(len))
2844 elog(ERROR, "unexpected end of tape");
2845 if (len == 0 && !eofOK)
2846 elog(ERROR, "unexpected end of data");
2847 return len;
2848}
2849
2850static void
2852{
2853 unsigned int len = 0;
2854
2855 LogicalTapeWrite(tape, &len, sizeof(len));
2856}
2857
2858/*
2859 * Get memory for tuple from within READTUP() routine.
2860 *
2861 * We use next free slot from the slab allocator, or palloc() if the tuple
2862 * is too large for that.
2863 */
2864void *
2866{
2867 SlabSlot *buf;
2868
2869 /*
2870 * We pre-allocate enough slots in the slab arena that we should never run
2871 * out.
2872 */
2873 Assert(state->slabFreeHead);
2874
2875 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2876 return MemoryContextAlloc(state->base.sortcontext, tuplen);
2877 else
2878 {
2879 buf = state->slabFreeHead;
2880 /* Reuse this slot */
2881 state->slabFreeHead = buf->nextfree;
2882
2883 return buf;
2884 }
2885}
2886
2887
2888/*
2889 * Parallel sort routines
2890 */
2891
2892/*
2893 * tuplesort_estimate_shared - estimate required shared memory allocation
2894 *
2895 * nWorkers is an estimate of the number of workers (it's the number that
2896 * will be requested).
2897 */
2898Size
2900{
2902
2903 Assert(nWorkers > 0);
2904
2905 /* Make sure that BufFile shared state is MAXALIGN'd */
2908
2909 return tapesSize;
2910}
2911
2912/*
2913 * tuplesort_initialize_shared - initialize shared tuplesort state
2914 *
2915 * Must be called from leader process before workers are launched, to
2916 * establish state needed up-front for worker tuplesortstates. nWorkers
2917 * should match the argument passed to tuplesort_estimate_shared().
2918 */
2919void
2921{
2922 int i;
2923
2924 Assert(nWorkers > 0);
2925
2926 SpinLockInit(&shared->mutex);
2927 shared->currentWorker = 0;
2928 shared->workersFinished = 0;
2929 SharedFileSetInit(&shared->fileset, seg);
2930 shared->nTapes = nWorkers;
2931 for (i = 0; i < nWorkers; i++)
2932 {
2933 shared->tapes[i].firstblocknumber = 0L;
2934 }
2935}
2936
2937/*
2938 * tuplesort_attach_shared - attach to shared tuplesort state
2939 *
2940 * Must be called by all worker processes.
2941 */
2942void
2944{
2945 /* Attach to SharedFileSet */
2946 SharedFileSetAttach(&shared->fileset, seg);
2947}
2948
2949/*
2950 * worker_get_identifier - Assign and return ordinal identifier for worker
2951 *
2952 * The order in which these are assigned is not well defined, and should not
2953 * matter; worker numbers across parallel sort participants need only be
2954 * distinct and gapless. logtape.c requires this.
2955 *
2956 * Note that the identifiers assigned from here have no relation to
2957 * ParallelWorkerNumber number, to avoid making any assumption about
2958 * caller's requirements. However, we do follow the ParallelWorkerNumber
2959 * convention of representing a non-worker with worker number -1. This
2960 * includes the leader, as well as serial Tuplesort processes.
2961 */
2962static int
2964{
2965 Sharedsort *shared = state->shared;
2966 int worker;
2967
2969
2970 SpinLockAcquire(&shared->mutex);
2971 worker = shared->currentWorker++;
2972 SpinLockRelease(&shared->mutex);
2973
2974 return worker;
2975}
2976
2977/*
2978 * worker_freeze_result_tape - freeze worker's result tape for leader
2979 *
2980 * This is called by workers just after the result tape has been determined,
2981 * instead of calling LogicalTapeFreeze() directly. They do so because
2982 * workers require a few additional steps over similar serial
2983 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
2984 * steps are around freeing now unneeded resources, and representing to
2985 * leader that worker's input run is available for its merge.
2986 *
2987 * There should only be one final output run for each worker, which consists
2988 * of all tuples that were originally input into worker.
2989 */
2990static void
2992{
2993 Sharedsort *shared = state->shared;
2995
2997 Assert(state->result_tape != NULL);
2998 Assert(state->memtupcount == 0);
2999
3000 /*
3001 * Free most remaining memory, in case caller is sensitive to our holding
3002 * on to it. memtuples may not be a tiny merge heap at this point.
3003 */
3004 pfree(state->memtuples);
3005 /* Be tidy */
3006 state->memtuples = NULL;
3007 state->memtupsize = 0;
3008
3009 /*
3010 * Parallel worker requires result tape metadata, which is to be stored in
3011 * shared memory for leader
3012 */
3013 LogicalTapeFreeze(state->result_tape, &output);
3014
3015 /* Store properties of output tape, and update finished worker count */
3016 SpinLockAcquire(&shared->mutex);
3017 shared->tapes[state->worker] = output;
3018 shared->workersFinished++;
3019 SpinLockRelease(&shared->mutex);
3020}
3021
3022/*
3023 * worker_nomergeruns - dump memtuples in worker, without merging
3024 *
3025 * This called as an alternative to mergeruns() with a worker when no
3026 * merging is required.
3027 */
3028static void
3030{
3032 Assert(state->result_tape == NULL);
3033 Assert(state->nOutputRuns == 1);
3034
3035 state->result_tape = state->destTape;
3037}
3038
3039/*
3040 * leader_takeover_tapes - create tapeset for leader from worker tapes
3041 *
3042 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3043 * sorting has occurred in workers, all of which must have already returned
3044 * from tuplesort_performsort().
3045 *
3046 * When this returns, leader process is left in a state that is virtually
3047 * indistinguishable from it having generated runs as a serial external sort
3048 * might have.
3049 */
3050static void
3052{
3053 Sharedsort *shared = state->shared;
3054 int nParticipants = state->nParticipants;
3055 int workersFinished;
3056 int j;
3057
3059 Assert(nParticipants >= 1);
3060
3061 SpinLockAcquire(&shared->mutex);
3062 workersFinished = shared->workersFinished;
3063 SpinLockRelease(&shared->mutex);
3064
3065 if (nParticipants != workersFinished)
3066 elog(ERROR, "cannot take over tapes before all workers finish");
3067
3068 /*
3069 * Create the tapeset from worker tapes, including a leader-owned tape at
3070 * the end. Parallel workers are far more expensive than logical tapes,
3071 * so the number of tapes allocated here should never be excessive.
3072 */
3073 inittapestate(state, nParticipants);
3074 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3075
3076 /*
3077 * Set currentRun to reflect the number of runs we will merge (it's not
3078 * used for anything, this is just pro forma)
3079 */
3080 state->currentRun = nParticipants;
3081
3082 /*
3083 * Initialize the state to look the same as after building the initial
3084 * runs.
3085 *
3086 * There will always be exactly 1 run per worker, and exactly one input
3087 * tape per run, because workers always output exactly 1 run, even when
3088 * there were no input tuples for workers to sort.
3089 */
3090 state->inputTapes = NULL;
3091 state->nInputTapes = 0;
3092 state->nInputRuns = 0;
3093
3094 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3095 state->nOutputTapes = nParticipants;
3096 state->nOutputRuns = nParticipants;
3097
3098 for (j = 0; j < nParticipants; j++)
3099 {
3100 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3101 }
3102
3103 state->status = TSS_BUILDRUNS;
3104}
3105
3106/*
3107 * Convenience routine to free a tuple previously loaded into sort memory
3108 */
3109static void
3111{
3112 if (stup->tuple)
3113 {
3115 pfree(stup->tuple);
3116 stup->tuple = NULL;
3117 }
3118}
3119
3120int
3122{
3123 if (x < y)
3124 return -1;
3125 else if (x > y)
3126 return 1;
3127 else
3128 return 0;
3129}
3130
3131int
3133{
3136
3137 if (xx < yy)
3138 return -1;
3139 else if (xx > yy)
3140 return 1;
3141 else
3142 return 0;
3143}
3144
3145int
3147{
3150
3151 if (xx < yy)
3152 return -1;
3153 else if (xx > yy)
3154 return 1;
3155 else
3156 return 0;
3157}
void PrepareTempTablespaces(void)
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition bump.c:133
#define Min(x, y)
Definition c.h:997
#define MAXALIGN(LEN)
Definition c.h:826
#define INT64_FORMAT
Definition c.h:564
#define Assert(condition)
Definition c.h:873
int64_t int64
Definition c.h:543
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:480
#define pg_attribute_always_inline
Definition c.h:279
int32_t int32
Definition c.h:542
size_t Size
Definition c.h:619
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define palloc0_object(type)
Definition fe_memutils.h:75
static int compare(const void *arg1, const void *arg2)
Definition geqo_pool.c:144
FILE * output
TuplesortSpaceType
@ SORT_SPACE_TYPE_DISK
@ SORT_SPACE_TYPE_MEMORY
TuplesortMethod
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_QUICKSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
int y
Definition isn.c:76
int b
Definition isn.c:74
int x
Definition isn.c:75
int a
Definition isn.c:73
int j
Definition isn.c:78
int i
Definition isn.c:77
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition logtape.c:1133
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition logtape.c:556
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition logtape.c:761
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition logtape.c:680
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition logtape.c:609
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
Size GetMemoryChunkSpace(void *pointer)
Definition mcxt.c:770
void * palloc0(Size size)
Definition mcxt.c:1417
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void * repalloc_huge(void *pointer, Size size)
Definition mcxt.c:1757
void MemoryContextResetOnly(MemoryContext context)
Definition mcxt.c:422
#define AllocSetContextCreate
Definition memutils.h:129
#define MaxAllocHugeSize
Definition memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const void size_t len
const char * pg_rusage_show(const PGRUsage *ru0)
Definition pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition pg_rusage.c:27
static char buf[DEFAULT_XLOG_SEG_SIZE]
static int64 DatumGetInt64(Datum X)
Definition postgres.h:413
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:212
static int fb(int x)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510
static int ApplySignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
#define SpinLockInit(lock)
Definition spin.h:57
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
SharedFileSet fileset
Definition tuplesort.c:357
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition tuplesort.c:366
int workersFinished
Definition tuplesort.c:354
slock_t mutex
Definition tuplesort.c:343
int currentWorker
Definition tuplesort.c:353
int srctape
Definition tuplesort.h:119
Datum datum1
Definition tuplesort.h:117
int64 firstblocknumber
Definition logtape.h:54
TuplesortSpaceType spaceType
void * lastReturnedTuple
Definition tuplesort.c:261
LogicalTapeSet * tapeset
Definition tuplesort.c:205
bool isMaxSpaceDisk
Definition tuplesort.c:202
SortTuple * memtuples
Definition tuplesort.c:214
LogicalTape ** inputTapes
Definition tuplesort.c:277
bool slabAllocatorUsed
Definition tuplesort.c:246
TuplesortPublic base
Definition tuplesort.c:185
char * slabMemoryEnd
Definition tuplesort.c:249
PGRUsage ru_start
Definition tuplesort.c:333
char * slabMemoryBegin
Definition tuplesort.c:248
LogicalTape ** outputTapes
Definition tuplesort.c:281
size_t tape_buffer_mem
Definition tuplesort.c:253
TupSortStatus status
Definition tuplesort.c:186
LogicalTape * destTape
Definition tuplesort.c:285
TupSortStatus maxSpaceStatus
Definition tuplesort.c:204
int64 markpos_block
Definition tuplesort.c:297
Sharedsort * shared
Definition tuplesort.c:318
LogicalTape * result_tape
Definition tuplesort.c:292
SlabSlot * slabFreeHead
Definition tuplesort.c:250
void tuplesort_rescan(Tuplesortstate *state)
Definition tuplesort.c:2387
void tuplesort_performsort(Tuplesortstate *state)
Definition tuplesort.c:1348
int tuplesort_merge_order(int64 allowedMem)
Definition tuplesort.c:1763
#define TAPE_BUFFER_OVERHEAD
Definition tuplesort.c:176
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition tuplesort.c:2757
#define INITIAL_MEMTUPSIZE
Definition tuplesort.c:118
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition tuplesort.c:2839
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition tuplesort.c:2921
#define COMPARETUP(state, a, b)
Definition tuplesort.c:393
static void selectnewtape(Tuplesortstate *state)
Definition tuplesort.c:1933
void tuplesort_reset(Tuplesortstate *state)
Definition tuplesort.c:1004
#define SERIAL(state)
Definition tuplesort.c:400
#define FREESTATE(state)
Definition tuplesort.c:396
static void markrunend(LogicalTape *tape)
Definition tuplesort.c:2852
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition tuplesort.c:1695
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition tuplesort.c:3111
#define REMOVEABBREV(state, stup, count)
Definition tuplesort.c:392
#define LACKMEM(state)
Definition tuplesort.c:397
static void reversedirection(Tuplesortstate *state)
Definition tuplesort.c:2821
#define USEMEM(state, amt)
Definition tuplesort.c:398
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition tuplesort.c:2722
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3133
static bool grow_memtuples(Tuplesortstate *state)
Definition tuplesort.c:1037
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3122
static void beginmerge(Tuplesortstate *state)
Definition tuplesort.c:2245
static void make_bounded_heap(Tuplesortstate *state)
Definition tuplesort.c:2572
bool tuplesort_used_bound(Tuplesortstate *state)
Definition tuplesort.c:871
#define WRITETUP(state, tape, stup)
Definition tuplesort.c:394
static void sort_bounded_heap(Tuplesortstate *state)
Definition tuplesort.c:2621
TupSortStatus
Definition tuplesort.c:153
@ TSS_SORTEDONTAPE
Definition tuplesort.c:158
@ TSS_SORTEDINMEM
Definition tuplesort.c:157
@ TSS_INITIAL
Definition tuplesort.c:154
@ TSS_FINALMERGE
Definition tuplesort.c:159
@ TSS_BUILDRUNS
Definition tuplesort.c:156
@ TSS_BOUNDED
Definition tuplesort.c:155
static int worker_get_identifier(Tuplesortstate *state)
Definition tuplesort.c:2964
static void mergeonerun(Tuplesortstate *state)
Definition tuplesort.c:2185
#define FREEMEM(state, amt)
Definition tuplesort.c:399
#define MAXORDER
Definition tuplesort.c:175
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition tuplesort.c:1899
#define SLAB_SLOT_SIZE
Definition tuplesort.c:140
static void leader_takeover_tapes(Tuplesortstate *state)
Definition tuplesort.c:3052
Size tuplesort_estimate_shared(int nWorkers)
Definition tuplesort.c:2900
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition tuplesort.c:2484
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition tuplesort.c:635
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition tuplesort.c:2661
void tuplesort_end(Tuplesortstate *state)
Definition tuplesort.c:936
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition tuplesort.c:1850
void tuplesort_markpos(Tuplesortstate *state)
Definition tuplesort.c:2420
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition tuplesort.c:1154
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition tuplesort.c:2551
#define MERGE_BUFFER_SIZE
Definition tuplesort.c:177
#define READTUP(state, stup, tape, len)
Definition tuplesort.c:395
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3147
#define LEADER(state)
Definition tuplesort.c:402
#define WORKER(state)
Definition tuplesort.c:401
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition tuplesort.c:1455
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition tuplesort.c:1818
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition tuplesort.c:2273
static void tuplesort_updatemax(Tuplesortstate *state)
Definition tuplesort.c:953
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition tuplesort.c:2992
bool trace_sort
Definition tuplesort.c:122
static pg_attribute_always_inline int qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition tuplesort.c:514
#define RELEASE_SLAB_SLOT(state, tuple)
Definition tuplesort.c:380
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition tuplesort.c:2944
static void worker_nomergeruns(Tuplesortstate *state)
Definition tuplesort.c:3030
const char * tuplesort_method_name(TuplesortMethod m)
Definition tuplesort.c:2528
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition tuplesort.c:492
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition tuplesort.c:2781
void tuplesort_restorepos(Tuplesortstate *state)
Definition tuplesort.c:2451
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition tuplesort.c:537
static void mergeruns(Tuplesortstate *state)
Definition tuplesort.c:2002
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition tuplesort.c:2866
#define MINORDER
Definition tuplesort.c:174
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition tuplesort.c:741
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition tuplesort.c:823
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition tuplesort.c:1966
static bool consider_abort_common(Tuplesortstate *state)
Definition tuplesort.c:1304
static void tuplesort_free(Tuplesortstate *state)
Definition tuplesort.c:882
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition tuplesort.c:2292
#define TupleSortUseBumpTupleCxt(opt)
Definition tuplesort.h:82
#define TUPLESORT_RANDOMACCESS
Definition tuplesort.h:70
#define TUPLESORT_ALLOWBOUNDED
Definition tuplesort.h:73
char buffer[SLAB_SLOT_SIZE]
Definition tuplesort.c:145
union SlabSlot * nextfree
Definition tuplesort.c:144

◆ IS_SLAB_SLOT

#define IS_SLAB_SLOT (   state,
  tuple 
)
Value:
((char *) (tuple) >= (state)->slabMemoryBegin && \
(char *) (tuple) < (state)->slabMemoryEnd)

Definition at line 372 of file tuplesort.c.

◆ LACKMEM

#define LACKMEM (   state)    ((state)->availMem < 0 && !(state)->slabAllocatorUsed)

Definition at line 397 of file tuplesort.c.

◆ LEADER

#define LEADER (   state)    ((state)->shared && (state)->worker == -1)

Definition at line 402 of file tuplesort.c.

◆ MAXORDER

#define MAXORDER   500 /* maximum merge order */

Definition at line 175 of file tuplesort.c.

◆ MERGE_BUFFER_SIZE

#define MERGE_BUFFER_SIZE   (BLCKSZ * 32)

Definition at line 177 of file tuplesort.c.

◆ MINORDER

#define MINORDER   6 /* minimum merge order */

Definition at line 174 of file tuplesort.c.

◆ READTUP

#define READTUP (   state,
  stup,
  tape,
  len 
)    ((*(state)->base.readtup) (state, stup, tape, len))

Definition at line 395 of file tuplesort.c.

◆ RELEASE_SLAB_SLOT

#define RELEASE_SLAB_SLOT (   state,
  tuple 
)
Value:
do { \
SlabSlot *buf = (SlabSlot *) tuple; \
{ \
buf->nextfree = (state)->slabFreeHead; \
(state)->slabFreeHead = buf; \
} while(0)
#define IS_SLAB_SLOT(state, tuple)
Definition tuplesort.c:372

Definition at line 380 of file tuplesort.c.

381 { \
382 SlabSlot *buf = (SlabSlot *) tuple; \
383 \
385 { \
386 buf->nextfree = (state)->slabFreeHead; \
387 (state)->slabFreeHead = buf; \
388 } else \
389 pfree(buf); \
390 } while(0)

◆ REMOVEABBREV

#define REMOVEABBREV (   state,
  stup,
  count 
)    ((*(state)->base.removeabbrev) (state, stup, count))

Definition at line 392 of file tuplesort.c.

◆ SERIAL

#define SERIAL (   state)    ((state)->shared == NULL)

Definition at line 400 of file tuplesort.c.

◆ SLAB_SLOT_SIZE

#define SLAB_SLOT_SIZE   1024

Definition at line 140 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [1/5]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 571 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [2/5]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 571 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [3/5]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 571 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [4/5]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 571 of file tuplesort.c.

◆ ST_CHECK_FOR_INTERRUPTS [5/5]

#define ST_CHECK_FOR_INTERRUPTS

Definition at line 571 of file tuplesort.c.

◆ ST_COMPARE [1/4]

#define ST_COMPARE (   a,
  b,
  ssup 
)
Value:
ApplySortComparator((a)->datum1, (a)->isnull1, \
(b)->datum1, (b)->isnull1, (ssup))
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)

Definition at line 569 of file tuplesort.c.

◆ ST_COMPARE [2/4]

#define ST_COMPARE (   a,
  b,
  state 
)    qsort_tuple_unsigned_compare(a, b, state)

Definition at line 569 of file tuplesort.c.

◆ ST_COMPARE [3/4]

#define ST_COMPARE (   a,
  b,
  state 
)    qsort_tuple_signed_compare(a, b, state)

Definition at line 569 of file tuplesort.c.

◆ ST_COMPARE [4/4]

#define ST_COMPARE (   a,
  b,
  state 
)    qsort_tuple_int32_compare(a, b, state)

Definition at line 569 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [1/5]

#define ST_COMPARE_ARG_TYPE   Tuplesortstate

Definition at line 570 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [2/5]

#define ST_COMPARE_ARG_TYPE   Tuplesortstate

Definition at line 570 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [3/5]

#define ST_COMPARE_ARG_TYPE   Tuplesortstate

Definition at line 570 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [4/5]

#define ST_COMPARE_ARG_TYPE   Tuplesortstate

Definition at line 570 of file tuplesort.c.

◆ ST_COMPARE_ARG_TYPE [5/5]

#define ST_COMPARE_ARG_TYPE   SortSupportData

Definition at line 570 of file tuplesort.c.

◆ ST_COMPARE_RUNTIME_POINTER

#define ST_COMPARE_RUNTIME_POINTER

Definition at line 596 of file tuplesort.c.

◆ ST_DECLARE

#define ST_DECLARE

Definition at line 600 of file tuplesort.c.

◆ ST_DEFINE [1/5]

#define ST_DEFINE

Definition at line 573 of file tuplesort.c.

◆ ST_DEFINE [2/5]

#define ST_DEFINE

Definition at line 573 of file tuplesort.c.

◆ ST_DEFINE [3/5]

#define ST_DEFINE

Definition at line 573 of file tuplesort.c.

◆ ST_DEFINE [4/5]

#define ST_DEFINE

Definition at line 573 of file tuplesort.c.

◆ ST_DEFINE [5/5]

#define ST_DEFINE

Definition at line 573 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [1/5]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 568 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [2/5]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 568 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [3/5]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 568 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [4/5]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 568 of file tuplesort.c.

◆ ST_ELEMENT_TYPE [5/5]

#define ST_ELEMENT_TYPE   SortTuple

Definition at line 568 of file tuplesort.c.

◆ ST_SCOPE [1/5]

#define ST_SCOPE   static

Definition at line 572 of file tuplesort.c.

◆ ST_SCOPE [2/5]

#define ST_SCOPE   static

Definition at line 572 of file tuplesort.c.

◆ ST_SCOPE [3/5]

#define ST_SCOPE   static

Definition at line 572 of file tuplesort.c.

◆ ST_SCOPE [4/5]

#define ST_SCOPE   static

Definition at line 572 of file tuplesort.c.

◆ ST_SCOPE [5/5]

#define ST_SCOPE   static

Definition at line 572 of file tuplesort.c.

◆ ST_SORT [1/5]

Definition at line 567 of file tuplesort.c.

◆ ST_SORT [2/5]

#define ST_SORT   qsort_tuple_signed

Definition at line 567 of file tuplesort.c.

◆ ST_SORT [3/5]

#define ST_SORT   qsort_tuple_int32

Definition at line 567 of file tuplesort.c.

◆ ST_SORT [4/5]

#define ST_SORT   qsort_tuple

Definition at line 567 of file tuplesort.c.

◆ ST_SORT [5/5]

#define ST_SORT   qsort_ssup

Definition at line 567 of file tuplesort.c.

◆ TAPE_BUFFER_OVERHEAD

#define TAPE_BUFFER_OVERHEAD   BLCKSZ

Definition at line 176 of file tuplesort.c.

◆ USEMEM

#define USEMEM (   state,
  amt 
)    ((state)->availMem -= (amt))

Definition at line 398 of file tuplesort.c.

◆ WORKER

#define WORKER (   state)    ((state)->shared && (state)->worker != -1)

Definition at line 401 of file tuplesort.c.

◆ WRITETUP

#define WRITETUP (   state,
  tape,
  stup 
)    ((*(state)->base.writetup) (state, tape, stup))

Definition at line 394 of file tuplesort.c.

Typedef Documentation

◆ SlabSlot

Enumeration Type Documentation

◆ TupSortStatus

Enumerator
TSS_INITIAL 
TSS_BOUNDED 
TSS_BUILDRUNS 
TSS_SORTEDINMEM 
TSS_SORTEDONTAPE 
TSS_FINALMERGE 

Definition at line 152 of file tuplesort.c.

153{
154 TSS_INITIAL, /* Loading tuples; still within memory limit */
155 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
156 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
157 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
158 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
159 TSS_FINALMERGE, /* Performing final merge on-the-fly */

Function Documentation

◆ beginmerge()

static void beginmerge ( Tuplesortstate state)
static

Definition at line 2245 of file tuplesort.c.

2246{
2247 int activeTapes;
2248 int srcTapeIndex;
2249
2250 /* Heap should be empty here */
2251 Assert(state->memtupcount == 0);
2252
2253 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2254
2256 {
2257 SortTuple tup;
2258
2259 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2260 {
2263 }
2264 }
2265}

References Assert, fb(), mergereadnext(), Min, SortTuple::srctape, and tuplesort_heap_insert().

Referenced by mergeonerun(), and mergeruns().

◆ consider_abort_common()

static bool consider_abort_common ( Tuplesortstate state)
static

Definition at line 1304 of file tuplesort.c.

1305{
1306 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1307 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1308 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1309
1310 /*
1311 * Check effectiveness of abbreviation optimization. Consider aborting
1312 * when still within memory limit.
1313 */
1314 if (state->status == TSS_INITIAL &&
1315 state->memtupcount >= state->abbrevNext)
1316 {
1317 state->abbrevNext *= 2;
1318
1319 /*
1320 * Check opclass-supplied abbreviation abort routine. It may indicate
1321 * that abbreviation should not proceed.
1322 */
1323 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1324 state->base.sortKeys))
1325 return false;
1326
1327 /*
1328 * Finally, restore authoritative comparator, and indicate that
1329 * abbreviation is not in play by setting abbrev_converter to NULL
1330 */
1331 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1332 state->base.sortKeys[0].abbrev_converter = NULL;
1333 /* Not strictly necessary, but be tidy */
1334 state->base.sortKeys[0].abbrev_abort = NULL;
1335 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1336
1337 /* Give up - expect original pass-by-value representation */
1338 return true;
1339 }
1340
1341 return false;
1342}

References Assert, fb(), and TSS_INITIAL.

Referenced by tuplesort_puttuple_common().

◆ dumptuples()

static void dumptuples ( Tuplesortstate state,
bool  alltuples 
)
static

Definition at line 2292 of file tuplesort.c.

2293{
2294 int memtupwrite;
2295 int i;
2296
2297 /*
2298 * Nothing to do if we still fit in available memory and have array slots,
2299 * unless this is the final call during initial run generation.
2300 */
2301 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2302 !alltuples)
2303 return;
2304
2305 /*
2306 * Final call might require no sorting, in rare cases where we just so
2307 * happen to have previously LACKMEM()'d at the point where exactly all
2308 * remaining tuples are loaded into memory, just before input was
2309 * exhausted. In general, short final runs are quite possible, but avoid
2310 * creating a completely empty run. In a worker, though, we must produce
2311 * at least one tape, even if it's empty.
2312 */
2313 if (state->memtupcount == 0 && state->currentRun > 0)
2314 return;
2315
2316 Assert(state->status == TSS_BUILDRUNS);
2317
2318 /*
2319 * It seems unlikely that this limit will ever be exceeded, but take no
2320 * chances
2321 */
2322 if (state->currentRun == INT_MAX)
2323 ereport(ERROR,
2325 errmsg("cannot have more than %d runs for an external sort",
2326 INT_MAX)));
2327
2328 if (state->currentRun > 0)
2330
2331 state->currentRun++;
2332
2333 if (trace_sort)
2334 elog(LOG, "worker %d starting quicksort of run %d: %s",
2335 state->worker, state->currentRun,
2336 pg_rusage_show(&state->ru_start));
2337
2338 /*
2339 * Sort all tuples accumulated within the allowed amount of memory for
2340 * this run using quicksort
2341 */
2343
2344 if (trace_sort)
2345 elog(LOG, "worker %d finished quicksort of run %d: %s",
2346 state->worker, state->currentRun,
2347 pg_rusage_show(&state->ru_start));
2348
2349 memtupwrite = state->memtupcount;
2350 for (i = 0; i < memtupwrite; i++)
2351 {
2352 SortTuple *stup = &state->memtuples[i];
2353
2354 WRITETUP(state, state->destTape, stup);
2355 }
2356
2357 state->memtupcount = 0;
2358
2359 /*
2360 * Reset tuple memory. We've freed all of the tuples that we previously
2361 * allocated. It's important to avoid fragmentation when there is a stark
2362 * change in the sizes of incoming tuples. In bounded sorts,
2363 * fragmentation due to AllocSetFree's bucketing by size class might be
2364 * particularly bad if this step wasn't taken.
2365 */
2366 MemoryContextReset(state->base.tuplecontext);
2367
2368 /*
2369 * Now update the memory accounting to subtract the memory used by the
2370 * tuple.
2371 */
2372 FREEMEM(state, state->tupleMem);
2373 state->tupleMem = 0;
2374
2375 markrunend(state->destTape);
2376
2377 if (trace_sort)
2378 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2379 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2380 pg_rusage_show(&state->ru_start));
2381}

References Assert, elog, ereport, errcode(), errmsg(), ERROR, fb(), FREEMEM, i, LACKMEM, LOG, markrunend(), MemoryContextReset(), pg_rusage_show(), selectnewtape(), trace_sort, TSS_BUILDRUNS, tuplesort_sort_memtuples(), and WRITETUP.

Referenced by tuplesort_performsort(), and tuplesort_puttuple_common().

◆ free_sort_tuple()

static void free_sort_tuple ( Tuplesortstate state,
SortTuple stup 
)
static

Definition at line 3111 of file tuplesort.c.

3112{
3113 if (stup->tuple)
3114 {
3116 pfree(stup->tuple);
3117 stup->tuple = NULL;
3118 }
3119}

References fb(), FREEMEM, GetMemoryChunkSpace(), and pfree().

Referenced by make_bounded_heap(), and tuplesort_puttuple_common().

◆ getlen()

static unsigned int getlen ( LogicalTape tape,
bool  eofOK 
)
static

Definition at line 2839 of file tuplesort.c.

2840{
2841 unsigned int len;
2842
2844 &len, sizeof(len)) != sizeof(len))
2845 elog(ERROR, "unexpected end of tape");
2846 if (len == 0 && !eofOK)
2847 elog(ERROR, "unexpected end of data");
2848 return len;
2849}

References elog, ERROR, fb(), len, and LogicalTapeRead().

Referenced by mergereadnext(), and tuplesort_gettuple_common().

◆ grow_memtuples()

static bool grow_memtuples ( Tuplesortstate state)
static

Definition at line 1037 of file tuplesort.c.

1038{
1039 int newmemtupsize;
1040 int memtupsize = state->memtupsize;
1041 int64 memNowUsed = state->allowedMem - state->availMem;
1042
1043 /* Forget it if we've already maxed out memtuples, per comment above */
1044 if (!state->growmemtuples)
1045 return false;
1046
1047 /* Select new value of memtupsize */
1048 if (memNowUsed <= state->availMem)
1049 {
1050 /*
1051 * We've used no more than half of allowedMem; double our usage,
1052 * clamping at INT_MAX tuples.
1053 */
1054 if (memtupsize < INT_MAX / 2)
1055 newmemtupsize = memtupsize * 2;
1056 else
1057 {
1059 state->growmemtuples = false;
1060 }
1061 }
1062 else
1063 {
1064 /*
1065 * This will be the last increment of memtupsize. Abandon doubling
1066 * strategy and instead increase as much as we safely can.
1067 *
1068 * To stay within allowedMem, we can't increase memtupsize by more
1069 * than availMem / sizeof(SortTuple) elements. In practice, we want
1070 * to increase it by considerably less, because we need to leave some
1071 * space for the tuples to which the new array slots will refer. We
1072 * assume the new tuples will be about the same size as the tuples
1073 * we've already seen, and thus we can extrapolate from the space
1074 * consumption so far to estimate an appropriate new size for the
1075 * memtuples array. The optimal value might be higher or lower than
1076 * this estimate, but it's hard to know that in advance. We again
1077 * clamp at INT_MAX tuples.
1078 *
1079 * This calculation is safe against enlarging the array so much that
1080 * LACKMEM becomes true, because the memory currently used includes
1081 * the present array; thus, there would be enough allowedMem for the
1082 * new array elements even if no other memory were currently used.
1083 *
1084 * We do the arithmetic in float8, because otherwise the product of
1085 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1086 * result should be insignificant; but even if we computed a
1087 * completely insane result, the checks below will prevent anything
1088 * really bad from happening.
1089 */
1090 double grow_ratio;
1091
1092 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1093 if (memtupsize * grow_ratio < INT_MAX)
1094 newmemtupsize = (int) (memtupsize * grow_ratio);
1095 else
1097
1098 /* We won't make any further enlargement attempts */
1099 state->growmemtuples = false;
1100 }
1101
1102 /* Must enlarge array by at least one element, else report failure */
1103 if (newmemtupsize <= memtupsize)
1104 goto noalloc;
1105
1106 /*
1107 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1108 * to ensure our request won't be rejected. Note that we can easily
1109 * exhaust address space before facing this outcome. (This is presently
1110 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1111 * don't rely on that at this distance.)
1112 */
1113 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1114 {
1116 state->growmemtuples = false; /* can't grow any more */
1117 }
1118
1119 /*
1120 * We need to be sure that we do not cause LACKMEM to become true, else
1121 * the space management algorithm will go nuts. The code above should
1122 * never generate a dangerous request, but to be safe, check explicitly
1123 * that the array growth fits within availMem. (We could still cause
1124 * LACKMEM if the memory chunk overhead associated with the memtuples
1125 * array were to increase. That shouldn't happen because we chose the
1126 * initial array size large enough to ensure that palloc will be treating
1127 * both old and new arrays as separate chunks. But we'll check LACKMEM
1128 * explicitly below just in case.)
1129 */
1130 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1131 goto noalloc;
1132
1133 /* OK, do it */
1134 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1135 state->memtupsize = newmemtupsize;
1136 state->memtuples = (SortTuple *)
1137 repalloc_huge(state->memtuples,
1138 state->memtupsize * sizeof(SortTuple));
1139 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1140 if (LACKMEM(state))
1141 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1142 return true;
1143
1144noalloc:
1145 /* If for any reason we didn't realloc, shut off future attempts */
1146 state->growmemtuples = false;
1147 return false;
1148}

References elog, ERROR, fb(), FREEMEM, GetMemoryChunkSpace(), LACKMEM, MaxAllocHugeSize, repalloc_huge(), and USEMEM.

Referenced by tuplesort_puttuple_common().

◆ init_slab_allocator()

static void init_slab_allocator ( Tuplesortstate state,
int  numSlots 
)
static

Definition at line 1966 of file tuplesort.c.

1967{
1968 if (numSlots > 0)
1969 {
1970 char *p;
1971 int i;
1972
1973 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1974 state->slabMemoryEnd = state->slabMemoryBegin +
1976 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1978
1979 p = state->slabMemoryBegin;
1980 for (i = 0; i < numSlots - 1; i++)
1981 {
1982 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1983 p += SLAB_SLOT_SIZE;
1984 }
1985 ((SlabSlot *) p)->nextfree = NULL;
1986 }
1987 else
1988 {
1989 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1990 state->slabFreeHead = NULL;
1991 }
1992 state->slabAllocatorUsed = true;
1993}

References fb(), i, palloc(), SLAB_SLOT_SIZE, and USEMEM.

Referenced by mergeruns().

◆ inittapes()

static void inittapes ( Tuplesortstate state,
bool  mergeruns 
)
static

Definition at line 1850 of file tuplesort.c.

1851{
1852 Assert(!LEADER(state));
1853
1854 if (mergeruns)
1855 {
1856 /* Compute number of input tapes to use when merging */
1857 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1858 }
1859 else
1860 {
1861 /* Workers can sometimes produce single run, output without merge */
1863 state->maxTapes = MINORDER;
1864 }
1865
1866 if (trace_sort)
1867 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1868 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1869
1870 /* Create the tape set */
1871 inittapestate(state, state->maxTapes);
1872 state->tapeset =
1874 state->shared ? &state->shared->fileset : NULL,
1875 state->worker);
1876
1877 state->currentRun = 0;
1878
1879 /*
1880 * Initialize logical tape arrays.
1881 */
1882 state->inputTapes = NULL;
1883 state->nInputTapes = 0;
1884 state->nInputRuns = 0;
1885
1886 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1887 state->nOutputTapes = 0;
1888 state->nOutputRuns = 0;
1889
1890 state->status = TSS_BUILDRUNS;
1891
1893}

References Assert, elog, fb(), inittapestate(), LEADER, LOG, LogicalTapeSetCreate(), mergeruns(), MINORDER, palloc0(), pg_rusage_show(), selectnewtape(), trace_sort, TSS_BUILDRUNS, tuplesort_merge_order(), and WORKER.

Referenced by tuplesort_performsort(), and tuplesort_puttuple_common().

◆ inittapestate()

static void inittapestate ( Tuplesortstate state,
int  maxTapes 
)
static

Definition at line 1899 of file tuplesort.c.

1900{
1902
1903 /*
1904 * Decrease availMem to reflect the space needed for tape buffers; but
1905 * don't decrease it to the point that we have no room for tuples. (That
1906 * case is only likely to occur if sorting pass-by-value Datums; in all
1907 * other scenarios the memtuples[] array is unlikely to occupy more than
1908 * half of allowedMem. In the pass-by-value case it's not important to
1909 * account for tuple space, so we don't care if LACKMEM becomes
1910 * inaccurate.)
1911 */
1912 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1913
1914 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1916
1917 /*
1918 * Make sure that the temp file(s) underlying the tape set are created in
1919 * suitable temp tablespaces. For parallel sorts, this should have been
1920 * called already, but it doesn't matter if it is called a second time.
1921 */
1923}

References fb(), GetMemoryChunkSpace(), PrepareTempTablespaces(), TAPE_BUFFER_OVERHEAD, and USEMEM.

Referenced by inittapes(), and leader_takeover_tapes().

◆ leader_takeover_tapes()

static void leader_takeover_tapes ( Tuplesortstate state)
static

Definition at line 3052 of file tuplesort.c.

3053{
3054 Sharedsort *shared = state->shared;
3055 int nParticipants = state->nParticipants;
3056 int workersFinished;
3057 int j;
3058
3060 Assert(nParticipants >= 1);
3061
3062 SpinLockAcquire(&shared->mutex);
3063 workersFinished = shared->workersFinished;
3064 SpinLockRelease(&shared->mutex);
3065
3066 if (nParticipants != workersFinished)
3067 elog(ERROR, "cannot take over tapes before all workers finish");
3068
3069 /*
3070 * Create the tapeset from worker tapes, including a leader-owned tape at
3071 * the end. Parallel workers are far more expensive than logical tapes,
3072 * so the number of tapes allocated here should never be excessive.
3073 */
3074 inittapestate(state, nParticipants);
3075 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3076
3077 /*
3078 * Set currentRun to reflect the number of runs we will merge (it's not
3079 * used for anything, this is just pro forma)
3080 */
3081 state->currentRun = nParticipants;
3082
3083 /*
3084 * Initialize the state to look the same as after building the initial
3085 * runs.
3086 *
3087 * There will always be exactly 1 run per worker, and exactly one input
3088 * tape per run, because workers always output exactly 1 run, even when
3089 * there were no input tuples for workers to sort.
3090 */
3091 state->inputTapes = NULL;
3092 state->nInputTapes = 0;
3093 state->nInputRuns = 0;
3094
3095 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3096 state->nOutputTapes = nParticipants;
3097 state->nOutputRuns = nParticipants;
3098
3099 for (j = 0; j < nParticipants; j++)
3100 {
3101 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3102 }
3103
3104 state->status = TSS_BUILDRUNS;
3105}

References Assert, elog, ERROR, fb(), Sharedsort::fileset, inittapestate(), j, LEADER, LogicalTapeImport(), LogicalTapeSetCreate(), Sharedsort::mutex, palloc0(), SpinLockAcquire, SpinLockRelease, Sharedsort::tapes, TSS_BUILDRUNS, and Sharedsort::workersFinished.

Referenced by tuplesort_performsort().

◆ make_bounded_heap()

static void make_bounded_heap ( Tuplesortstate state)
static

Definition at line 2572 of file tuplesort.c.

2573{
2574 int tupcount = state->memtupcount;
2575 int i;
2576
2577 Assert(state->status == TSS_INITIAL);
2578 Assert(state->bounded);
2579 Assert(tupcount >= state->bound);
2581
2582 /* Reverse sort direction so largest entry will be at root */
2584
2585 state->memtupcount = 0; /* make the heap empty */
2586 for (i = 0; i < tupcount; i++)
2587 {
2588 if (state->memtupcount < state->bound)
2589 {
2590 /* Insert next tuple into heap */
2591 /* Must copy source tuple to avoid possible overwrite */
2592 SortTuple stup = state->memtuples[i];
2593
2595 }
2596 else
2597 {
2598 /*
2599 * The heap is full. Replace the largest entry with the new
2600 * tuple, or just discard it, if it's larger than anything already
2601 * in the heap.
2602 */
2603 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2604 {
2605 free_sort_tuple(state, &state->memtuples[i]);
2607 }
2608 else
2609 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2610 }
2611 }
2612
2613 Assert(state->memtupcount == state->bound);
2614 state->status = TSS_BOUNDED;
2615}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, fb(), free_sort_tuple(), i, reversedirection(), SERIAL, TSS_BOUNDED, TSS_INITIAL, tuplesort_heap_insert(), and tuplesort_heap_replace_top().

Referenced by tuplesort_puttuple_common().

◆ markrunend()

static void markrunend ( LogicalTape tape)
static

Definition at line 2852 of file tuplesort.c.

2853{
2854 unsigned int len = 0;
2855
2856 LogicalTapeWrite(tape, &len, sizeof(len));
2857}

References fb(), len, and LogicalTapeWrite().

Referenced by dumptuples(), and mergeonerun().

◆ merge_read_buffer_size()

static int64 merge_read_buffer_size ( int64  avail_mem,
int  nInputTapes,
int  nInputRuns,
int  maxOutputTapes 
)
static

Definition at line 1818 of file tuplesort.c.

1820{
1821 int nOutputRuns;
1822 int nOutputTapes;
1823
1824 /*
1825 * How many output tapes will we produce in this pass?
1826 *
1827 * This is nInputRuns / nInputTapes, rounded up.
1828 */
1829 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1830
1831 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1832
1833 /*
1834 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1835 * remaining memory is divided evenly between the input tapes.
1836 *
1837 * This also follows from the formula in tuplesort_merge_order, but here
1838 * we derive the input buffer size from the amount of memory available,
1839 * and M and N.
1840 */
1841 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1842}

References fb(), Max, Min, and TAPE_BUFFER_OVERHEAD.

Referenced by mergeruns().

◆ mergeonerun()

static void mergeonerun ( Tuplesortstate state)
static

Definition at line 2185 of file tuplesort.c.

2186{
2187 int srcTapeIndex;
2189
2190 /*
2191 * Start the merge by loading one tuple from each active source tape into
2192 * the heap.
2193 */
2195
2196 Assert(state->slabAllocatorUsed);
2197
2198 /*
2199 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2200 * out, and replacing it with next tuple from same tape (if there is
2201 * another one).
2202 */
2203 while (state->memtupcount > 0)
2204 {
2206
2207 /* write the tuple to destTape */
2208 srcTapeIndex = state->memtuples[0].srctape;
2209 srcTape = state->inputTapes[srcTapeIndex];
2210 WRITETUP(state, state->destTape, &state->memtuples[0]);
2211
2212 /* recycle the slot of the tuple we just wrote out, for the next read */
2213 if (state->memtuples[0].tuple)
2214 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2215
2216 /*
2217 * pull next tuple from the tape, and replace the written-out tuple in
2218 * the heap with it.
2219 */
2221 {
2222 stup.srctape = srcTapeIndex;
2224 }
2225 else
2226 {
2228 state->nInputRuns--;
2229 }
2230 }
2231
2232 /*
2233 * When the heap empties, we're done. Write an end-of-run marker on the
2234 * output tape.
2235 */
2236 markrunend(state->destTape);
2237}

References Assert, beginmerge(), fb(), markrunend(), mergereadnext(), RELEASE_SLAB_SLOT, tuplesort_heap_delete_top(), tuplesort_heap_replace_top(), and WRITETUP.

Referenced by mergeruns().

◆ mergereadnext()

static bool mergereadnext ( Tuplesortstate state,
LogicalTape srcTape,
SortTuple stup 
)
static

Definition at line 2273 of file tuplesort.c.

2274{
2275 unsigned int tuplen;
2276
2277 /* read next tuple, if any */
2278 if ((tuplen = getlen(srcTape, true)) == 0)
2279 return false;
2280 READTUP(state, stup, srcTape, tuplen);
2281
2282 return true;
2283}

References fb(), getlen(), and READTUP.

Referenced by beginmerge(), mergeonerun(), and tuplesort_gettuple_common().

◆ mergeruns()

static void mergeruns ( Tuplesortstate state)
static

Definition at line 2002 of file tuplesort.c.

2003{
2004 int tapenum;
2005
2006 Assert(state->status == TSS_BUILDRUNS);
2007 Assert(state->memtupcount == 0);
2008
2009 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2010 {
2011 /*
2012 * If there are multiple runs to be merged, when we go to read back
2013 * tuples from disk, abbreviated keys will not have been stored, and
2014 * we don't care to regenerate them. Disable abbreviation from this
2015 * point on.
2016 */
2017 state->base.sortKeys->abbrev_converter = NULL;
2018 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2019
2020 /* Not strictly necessary, but be tidy */
2021 state->base.sortKeys->abbrev_abort = NULL;
2022 state->base.sortKeys->abbrev_full_comparator = NULL;
2023 }
2024
2025 /*
2026 * Reset tuple memory. We've freed all the tuples that we previously
2027 * allocated. We will use the slab allocator from now on.
2028 */
2029 MemoryContextResetOnly(state->base.tuplecontext);
2030
2031 /*
2032 * We no longer need a large memtuples array. (We will allocate a smaller
2033 * one for the heap later.)
2034 */
2035 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2036 pfree(state->memtuples);
2037 state->memtuples = NULL;
2038
2039 /*
2040 * Initialize the slab allocator. We need one slab slot per input tape,
2041 * for the tuples in the heap, plus one to hold the tuple last returned
2042 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2043 * however, we don't need to do allocate anything.)
2044 *
2045 * In a multi-pass merge, we could shrink this allocation for the last
2046 * merge pass, if it has fewer tapes than previous passes, but we don't
2047 * bother.
2048 *
2049 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2050 * to track memory usage of individual tuples.
2051 */
2052 if (state->base.tuples)
2053 init_slab_allocator(state, state->nOutputTapes + 1);
2054 else
2056
2057 /*
2058 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2059 * from each input tape.
2060 *
2061 * We could shrink this, too, between passes in a multi-pass merge, but we
2062 * don't bother. (The initial input tapes are still in outputTapes. The
2063 * number of input tapes will not increase between passes.)
2064 */
2065 state->memtupsize = state->nOutputTapes;
2066 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2067 state->nOutputTapes * sizeof(SortTuple));
2068 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2069
2070 /*
2071 * Use all the remaining memory we have available for tape buffers among
2072 * all the input tapes. At the beginning of each merge pass, we will
2073 * divide this memory between the input and output tapes in the pass.
2074 */
2075 state->tape_buffer_mem = state->availMem;
2076 USEMEM(state, state->tape_buffer_mem);
2077 if (trace_sort)
2078 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2079 state->worker, state->tape_buffer_mem / 1024);
2080
2081 for (;;)
2082 {
2083 /*
2084 * On the first iteration, or if we have read all the runs from the
2085 * input tapes in a multi-pass merge, it's time to start a new pass.
2086 * Rewind all the output tapes, and make them inputs for the next
2087 * pass.
2088 */
2089 if (state->nInputRuns == 0)
2090 {
2092
2093 /* Close the old, emptied, input tapes */
2094 if (state->nInputTapes > 0)
2095 {
2096 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2097 LogicalTapeClose(state->inputTapes[tapenum]);
2098 pfree(state->inputTapes);
2099 }
2100
2101 /* Previous pass's outputs become next pass's inputs. */
2102 state->inputTapes = state->outputTapes;
2103 state->nInputTapes = state->nOutputTapes;
2104 state->nInputRuns = state->nOutputRuns;
2105
2106 /*
2107 * Reset output tape variables. The actual LogicalTapes will be
2108 * created as needed, here we only allocate the array to hold
2109 * them.
2110 */
2111 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2112 state->nOutputTapes = 0;
2113 state->nOutputRuns = 0;
2114
2115 /*
2116 * Redistribute the memory allocated for tape buffers, among the
2117 * new input and output tapes.
2118 */
2120 state->nInputTapes,
2121 state->nInputRuns,
2122 state->maxTapes);
2123
2124 if (trace_sort)
2125 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2126 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2127 pg_rusage_show(&state->ru_start));
2128
2129 /* Prepare the new input tapes for merge pass. */
2130 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2132
2133 /*
2134 * If there's just one run left on each input tape, then only one
2135 * merge pass remains. If we don't have to produce a materialized
2136 * sorted tape, we can stop at this point and do the final merge
2137 * on-the-fly.
2138 */
2139 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2140 && state->nInputRuns <= state->nInputTapes
2141 && !WORKER(state))
2142 {
2143 /* Tell logtape.c we won't be writing anymore */
2145 /* Initialize for the final merge pass */
2147 state->status = TSS_FINALMERGE;
2148 return;
2149 }
2150 }
2151
2152 /* Select an output tape */
2154
2155 /* Merge one run from each input tape. */
2157
2158 /*
2159 * If the input tapes are empty, and we output only one output run,
2160 * we're done. The current output tape contains the final result.
2161 */
2162 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2163 break;
2164 }
2165
2166 /*
2167 * Done. The result is on a single run on a single tape.
2168 */
2169 state->result_tape = state->outputTapes[0];
2170 if (!WORKER(state))
2171 LogicalTapeFreeze(state->result_tape, NULL);
2172 else
2174 state->status = TSS_SORTEDONTAPE;
2175
2176 /* Close all the now-empty input tapes, to release their read buffers. */
2177 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2178 LogicalTapeClose(state->inputTapes[tapenum]);
2179}

References Assert, beginmerge(), elog, fb(), FREEMEM, GetMemoryChunkSpace(), init_slab_allocator(), INT64_FORMAT, LOG, LogicalTapeClose(), LogicalTapeFreeze(), LogicalTapeRewindForRead(), LogicalTapeSetForgetFreeSpace(), MemoryContextAlloc(), MemoryContextResetOnly(), merge_read_buffer_size(), mergeonerun(), palloc0(), pfree(), pg_rusage_show(), selectnewtape(), trace_sort, TSS_BUILDRUNS, TSS_FINALMERGE, TSS_SORTEDONTAPE, TUPLESORT_RANDOMACCESS, USEMEM, WORKER, and worker_freeze_result_tape().

Referenced by inittapes(), and tuplesort_performsort().

◆ qsort_tuple_int32_compare()

static pg_attribute_always_inline int qsort_tuple_int32_compare ( SortTuple a,
SortTuple b,
Tuplesortstate state 
)
static

Definition at line 537 of file tuplesort.c.

538{
539 int compare;
540
541 compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
542 b->datum1, b->isnull1,
543 &state->base.sortKeys[0]);
544
545 if (compare != 0)
546 return compare;
547
548 /*
549 * No need to waste effort calling the tiebreak function when there are no
550 * other keys to sort on.
551 */
552 if (state->base.onlyKey != NULL)
553 return 0;
554
555 return state->base.comparetup_tiebreak(a, b, state);
556}

References a, ApplyInt32SortComparator(), b, compare(), and fb().

◆ qsort_tuple_signed_compare()

static pg_attribute_always_inline int qsort_tuple_signed_compare ( SortTuple a,
SortTuple b,
Tuplesortstate state 
)
static

Definition at line 514 of file tuplesort.c.

515{
516 int compare;
517
518 compare = ApplySignedSortComparator(a->datum1, a->isnull1,
519 b->datum1, b->isnull1,
520 &state->base.sortKeys[0]);
521
522 if (compare != 0)
523 return compare;
524
525 /*
526 * No need to waste effort calling the tiebreak function when there are no
527 * other keys to sort on.
528 */
529 if (state->base.onlyKey != NULL)
530 return 0;
531
532 return state->base.comparetup_tiebreak(a, b, state);
533}

References a, ApplySignedSortComparator(), b, compare(), and fb().

◆ qsort_tuple_unsigned_compare()

static pg_attribute_always_inline int qsort_tuple_unsigned_compare ( SortTuple a,
SortTuple b,
Tuplesortstate state 
)
static

Definition at line 492 of file tuplesort.c.

493{
494 int compare;
495
496 compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
497 b->datum1, b->isnull1,
498 &state->base.sortKeys[0]);
499 if (compare != 0)
500 return compare;
501
502 /*
503 * No need to waste effort calling the tiebreak function when there are no
504 * other keys to sort on.
505 */
506 if (state->base.onlyKey != NULL)
507 return 0;
508
509 return state->base.comparetup_tiebreak(a, b, state);
510}

References a, ApplyUnsignedSortComparator(), b, compare(), and fb().

◆ reversedirection()

static void reversedirection ( Tuplesortstate state)
static

Definition at line 2821 of file tuplesort.c.

2822{
2823 SortSupport sortKey = state->base.sortKeys;
2824 int nkey;
2825
2826 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2827 {
2828 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2829 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2830 }
2831}

References fb().

Referenced by make_bounded_heap(), and sort_bounded_heap().

◆ selectnewtape()

static void selectnewtape ( Tuplesortstate state)
static

Definition at line 1933 of file tuplesort.c.

1934{
1935 /*
1936 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1937 * both zero. On each call, we create a new output tape to hold the next
1938 * run, until maxTapes is reached. After that, we assign new runs to the
1939 * existing tapes in a round robin fashion.
1940 */
1941 if (state->nOutputTapes < state->maxTapes)
1942 {
1943 /* Create a new tape to hold the next run */
1944 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1945 Assert(state->nOutputRuns == state->nOutputTapes);
1946 state->destTape = LogicalTapeCreate(state->tapeset);
1947 state->outputTapes[state->nOutputTapes] = state->destTape;
1948 state->nOutputTapes++;
1949 state->nOutputRuns++;
1950 }
1951 else
1952 {
1953 /*
1954 * We have reached the max number of tapes. Append to an existing
1955 * tape.
1956 */
1957 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1958 state->nOutputRuns++;
1959 }
1960}

References Assert, fb(), and LogicalTapeCreate().

Referenced by dumptuples(), inittapes(), and mergeruns().

◆ sort_bounded_heap()

static void sort_bounded_heap ( Tuplesortstate state)
static

Definition at line 2621 of file tuplesort.c.

2622{
2623 int tupcount = state->memtupcount;
2624
2625 Assert(state->status == TSS_BOUNDED);
2626 Assert(state->bounded);
2627 Assert(tupcount == state->bound);
2629
2630 /*
2631 * We can unheapify in place because each delete-top call will remove the
2632 * largest entry, which we can promptly store in the newly freed slot at
2633 * the end. Once we're down to a single-entry heap, we're done.
2634 */
2635 while (state->memtupcount > 1)
2636 {
2637 SortTuple stup = state->memtuples[0];
2638
2639 /* this sifts-up the next-largest entry and decreases memtupcount */
2641 state->memtuples[state->memtupcount] = stup;
2642 }
2643 state->memtupcount = tupcount;
2644
2645 /*
2646 * Reverse sort direction back to the original state. This is not
2647 * actually necessary but seems like a good idea for tidiness.
2648 */
2650
2651 state->status = TSS_SORTEDINMEM;
2652 state->boundUsed = true;
2653}

References Assert, fb(), reversedirection(), SERIAL, TSS_BOUNDED, TSS_SORTEDINMEM, and tuplesort_heap_delete_top().

Referenced by tuplesort_performsort().

◆ ssup_datum_int32_cmp()

int ssup_datum_int32_cmp ( Datum  x,
Datum  y,
SortSupport  ssup 
)

Definition at line 3147 of file tuplesort.c.

3148{
3151
3152 if (xx < yy)
3153 return -1;
3154 else if (xx > yy)
3155 return 1;
3156 else
3157 return 0;
3158}

References DatumGetInt32(), fb(), x, and y.

Referenced by btint4sortsupport(), date_sortsupport(), and tuplesort_sort_memtuples().

◆ ssup_datum_signed_cmp()

int ssup_datum_signed_cmp ( Datum  x,
Datum  y,
SortSupport  ssup 
)

Definition at line 3133 of file tuplesort.c.

3134{
3137
3138 if (xx < yy)
3139 return -1;
3140 else if (xx > yy)
3141 return 1;
3142 else
3143 return 0;
3144}

References DatumGetInt64(), fb(), x, and y.

Referenced by btint8sortsupport(), timestamp_sortsupport(), and tuplesort_sort_memtuples().

◆ ssup_datum_unsigned_cmp()

int ssup_datum_unsigned_cmp ( Datum  x,
Datum  y,
SortSupport  ssup 
)

Definition at line 3122 of file tuplesort.c.

3123{
3124 if (x < y)
3125 return -1;
3126 else if (x > y)
3127 return 1;
3128 else
3129 return 0;
3130}

References x, and y.

Referenced by bytea_sortsupport(), gist_point_sortsupport(), macaddr_sortsupport(), network_sortsupport(), tuplesort_sort_memtuples(), uuid_sortsupport(), and varstr_sortsupport().

◆ tuplesort_attach_shared()

void tuplesort_attach_shared ( Sharedsort shared,
dsm_segment seg 
)

Definition at line 2944 of file tuplesort.c.

2945{
2946 /* Attach to SharedFileSet */
2947 SharedFileSetAttach(&shared->fileset, seg);
2948}

References Sharedsort::fileset, and SharedFileSetAttach().

Referenced by _brin_parallel_build_main(), _bt_parallel_build_main(), and _gin_parallel_build_main().

◆ tuplesort_begin_batch()

static void tuplesort_begin_batch ( Tuplesortstate state)
static

Definition at line 741 of file tuplesort.c.

742{
743 MemoryContext oldcontext;
744
745 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
746
747 /*
748 * Caller tuple (e.g. IndexTuple) memory context.
749 *
750 * A dedicated child context used exclusively for caller passed tuples
751 * eases memory management. Resetting at key points reduces
752 * fragmentation. Note that the memtuples array of SortTuples is allocated
753 * in the parent context, not this context, because there is no need to
754 * free memtuples early. For bounded sorts, tuples may be pfreed in any
755 * order, so we use a regular aset.c context so that it can make use of
756 * free'd memory. When the sort is not bounded, we make use of a bump.c
757 * context as this keeps allocations more compact with less wastage.
758 * Allocations are also slightly more CPU efficient.
759 */
760 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
761 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
762 "Caller tuples",
764 else
765 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
766 "Caller tuples",
768
769
770 state->status = TSS_INITIAL;
771 state->bounded = false;
772 state->boundUsed = false;
773
774 state->availMem = state->allowedMem;
775
776 state->tapeset = NULL;
777
778 state->memtupcount = 0;
779
780 state->growmemtuples = true;
781 state->slabAllocatorUsed = false;
782 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
783 {
784 pfree(state->memtuples);
785 state->memtuples = NULL;
786 state->memtupsize = INITIAL_MEMTUPSIZE;
787 }
788 if (state->memtuples == NULL)
789 {
790 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
792 }
793
794 /* workMem must be large enough for the minimal memtuples array */
795 if (LACKMEM(state))
796 elog(ERROR, "insufficient memory allowed for sort");
797
798 state->currentRun = 0;
799
800 /*
801 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
802 * inittapes(), if needed.
803 */
804
805 state->result_tape = NULL; /* flag that result tape has not been formed */
806
807 MemoryContextSwitchTo(oldcontext);
808}

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, BumpContextCreate(), elog, ERROR, fb(), GetMemoryChunkSpace(), INITIAL_MEMTUPSIZE, LACKMEM, MemoryContextSwitchTo(), palloc(), pfree(), TSS_INITIAL, TupleSortUseBumpTupleCxt, and USEMEM.

Referenced by tuplesort_begin_common(), and tuplesort_reset().

◆ tuplesort_begin_common()

Tuplesortstate * tuplesort_begin_common ( int  workMem,
SortCoordinate  coordinate,
int  sortopt 
)

Definition at line 635 of file tuplesort.c.

636{
638 MemoryContext maincontext;
639 MemoryContext sortcontext;
640 MemoryContext oldcontext;
641
642 /* See leader_takeover_tapes() remarks on random access support */
643 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
644 elog(ERROR, "random access disallowed under parallel sort");
645
646 /*
647 * Memory context surviving tuplesort_reset. This memory context holds
648 * data which is useful to keep while sorting multiple similar batches.
649 */
651 "TupleSort main",
653
654 /*
655 * Create a working memory context for one sort operation. The content of
656 * this context is deleted by tuplesort_reset.
657 */
658 sortcontext = AllocSetContextCreate(maincontext,
659 "TupleSort sort",
661
662 /*
663 * Additionally a working memory context for tuples is setup in
664 * tuplesort_begin_batch.
665 */
666
667 /*
668 * Make the Tuplesortstate within the per-sortstate context. This way, we
669 * don't need a separate pfree() operation for it at shutdown.
670 */
671 oldcontext = MemoryContextSwitchTo(maincontext);
672
674
675 if (trace_sort)
676 pg_rusage_init(&state->ru_start);
677
678 state->base.sortopt = sortopt;
679 state->base.tuples = true;
680 state->abbrevNext = 10;
681
682 /*
683 * workMem is forced to be at least 64KB, the current minimum valid value
684 * for the work_mem GUC. This is a defense against parallel sort callers
685 * that divide out memory among many workers in a way that leaves each
686 * with very little memory.
687 */
688 state->allowedMem = Max(workMem, 64) * (int64) 1024;
689 state->base.sortcontext = sortcontext;
690 state->base.maincontext = maincontext;
691
692 state->memtupsize = INITIAL_MEMTUPSIZE;
693 state->memtuples = NULL;
694
695 /*
696 * After all of the other non-parallel-related state, we setup all of the
697 * state needed for each batch.
698 */
700
701 /*
702 * Initialize parallel-related state based on coordination information
703 * from caller
704 */
705 if (!coordinate)
706 {
707 /* Serial sort */
708 state->shared = NULL;
709 state->worker = -1;
710 state->nParticipants = -1;
711 }
712 else if (coordinate->isWorker)
713 {
714 /* Parallel worker produces exactly one final run from all input */
715 state->shared = coordinate->sharedsort;
717 state->nParticipants = -1;
718 }
719 else
720 {
721 /* Parallel leader state only used for final merge */
722 state->shared = coordinate->sharedsort;
723 state->worker = -1;
724 state->nParticipants = coordinate->nParticipants;
725 Assert(state->nParticipants >= 1);
726 }
727
728 MemoryContextSwitchTo(oldcontext);
729
730 return state;
731}

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CurrentMemoryContext, elog, ERROR, fb(), INITIAL_MEMTUPSIZE, Max, MemoryContextSwitchTo(), palloc0_object, pg_rusage_init(), trace_sort, tuplesort_begin_batch(), TUPLESORT_RANDOMACCESS, and worker_get_identifier().

Referenced by tuplesort_begin_cluster(), tuplesort_begin_datum(), tuplesort_begin_heap(), tuplesort_begin_index_brin(), tuplesort_begin_index_btree(), tuplesort_begin_index_gin(), tuplesort_begin_index_gist(), and tuplesort_begin_index_hash().

◆ tuplesort_end()

◆ tuplesort_estimate_shared()

Size tuplesort_estimate_shared ( int  nWorkers)

Definition at line 2900 of file tuplesort.c.

2901{
2903
2904 Assert(nWorkers > 0);
2905
2906 /* Make sure that BufFile shared state is MAXALIGN'd */
2909
2910 return tapesSize;
2911}

References add_size(), Assert, fb(), MAXALIGN, and mul_size().

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().

◆ tuplesort_free()

static void tuplesort_free ( Tuplesortstate state)
static

Definition at line 882 of file tuplesort.c.

883{
884 /* context swap probably not needed, but let's be safe */
885 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
886 int64 spaceUsed;
887
888 if (state->tapeset)
889 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
890 else
891 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
892
893 /*
894 * Delete temporary "tape" files, if any.
895 *
896 * We don't bother to destroy the individual tapes here. They will go away
897 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
898 * finished tapes already.)
899 */
900 if (state->tapeset)
901 LogicalTapeSetClose(state->tapeset);
902
903 if (trace_sort)
904 {
905 if (state->tapeset)
906 elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
907 SERIAL(state) ? "external sort" : "parallel external sort",
908 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
909 else
910 elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
911 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
912 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
913 }
914
915 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
916
918 MemoryContextSwitchTo(oldcontext);
919
920 /*
921 * Free the per-sort memory context, thereby releasing all working memory.
922 */
923 MemoryContextReset(state->base.sortcontext);
924}

References elog, fb(), FREESTATE, LOG, LogicalTapeSetBlocks(), LogicalTapeSetClose(), MemoryContextReset(), MemoryContextSwitchTo(), pg_rusage_show(), SERIAL, and trace_sort.

Referenced by tuplesort_end(), and tuplesort_reset().

◆ tuplesort_get_stats()

void tuplesort_get_stats ( Tuplesortstate state,
TuplesortInstrumentation stats 
)

Definition at line 2484 of file tuplesort.c.

2486{
2487 /*
2488 * Note: it might seem we should provide both memory and disk usage for a
2489 * disk-based sort. However, the current code doesn't track memory space
2490 * accurately once we have begun to return tuples to the caller (since we
2491 * don't account for pfree's the caller is expected to do), so we cannot
2492 * rely on availMem in a disk sort. This does not seem worth the overhead
2493 * to fix. Is it worth creating an API for the memory context code to
2494 * tell us how much is actually used in sortcontext?
2495 */
2497
2498 if (state->isMaxSpaceDisk)
2500 else
2502 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2503
2504 switch (state->maxSpaceStatus)
2505 {
2506 case TSS_SORTEDINMEM:
2507 if (state->boundUsed)
2509 else
2511 break;
2512 case TSS_SORTEDONTAPE:
2514 break;
2515 case TSS_FINALMERGE:
2517 break;
2518 default:
2520 break;
2521 }
2522}

References SORT_SPACE_TYPE_DISK, SORT_SPACE_TYPE_MEMORY, SORT_TYPE_EXTERNAL_MERGE, SORT_TYPE_EXTERNAL_SORT, SORT_TYPE_QUICKSORT, SORT_TYPE_STILL_IN_PROGRESS, SORT_TYPE_TOP_N_HEAPSORT, TuplesortInstrumentation::sortMethod, TuplesortInstrumentation::spaceType, TuplesortInstrumentation::spaceUsed, TSS_FINALMERGE, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and tuplesort_updatemax().

Referenced by ExecSort(), instrumentSortedGroup(), and show_sort_info().

◆ tuplesort_gettuple_common()

bool tuplesort_gettuple_common ( Tuplesortstate state,
bool  forward,
SortTuple stup 
)

Definition at line 1455 of file tuplesort.c.

1457{
1458 unsigned int tuplen;
1459 size_t nmoved;
1460
1461 Assert(!WORKER(state));
1462
1463 switch (state->status)
1464 {
1465 case TSS_SORTEDINMEM:
1466 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1467 Assert(!state->slabAllocatorUsed);
1468 if (forward)
1469 {
1470 if (state->current < state->memtupcount)
1471 {
1472 *stup = state->memtuples[state->current++];
1473 return true;
1474 }
1475 state->eof_reached = true;
1476
1477 /*
1478 * Complain if caller tries to retrieve more tuples than
1479 * originally asked for in a bounded sort. This is because
1480 * returning EOF here might be the wrong thing.
1481 */
1482 if (state->bounded && state->current >= state->bound)
1483 elog(ERROR, "retrieved too many tuples in a bounded sort");
1484
1485 return false;
1486 }
1487 else
1488 {
1489 if (state->current <= 0)
1490 return false;
1491
1492 /*
1493 * if all tuples are fetched already then we return last
1494 * tuple, else - tuple before last returned.
1495 */
1496 if (state->eof_reached)
1497 state->eof_reached = false;
1498 else
1499 {
1500 state->current--; /* last returned tuple */
1501 if (state->current <= 0)
1502 return false;
1503 }
1504 *stup = state->memtuples[state->current - 1];
1505 return true;
1506 }
1507 break;
1508
1509 case TSS_SORTEDONTAPE:
1510 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1511 Assert(state->slabAllocatorUsed);
1512
1513 /*
1514 * The slot that held the tuple that we returned in previous
1515 * gettuple call can now be reused.
1516 */
1517 if (state->lastReturnedTuple)
1518 {
1519 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1520 state->lastReturnedTuple = NULL;
1521 }
1522
1523 if (forward)
1524 {
1525 if (state->eof_reached)
1526 return false;
1527
1528 if ((tuplen = getlen(state->result_tape, true)) != 0)
1529 {
1530 READTUP(state, stup, state->result_tape, tuplen);
1531
1532 /*
1533 * Remember the tuple we return, so that we can recycle
1534 * its memory on next call. (This can be NULL, in the
1535 * !state->tuples case).
1536 */
1537 state->lastReturnedTuple = stup->tuple;
1538
1539 return true;
1540 }
1541 else
1542 {
1543 state->eof_reached = true;
1544 return false;
1545 }
1546 }
1547
1548 /*
1549 * Backward.
1550 *
1551 * if all tuples are fetched already then we return last tuple,
1552 * else - tuple before last returned.
1553 */
1554 if (state->eof_reached)
1555 {
1556 /*
1557 * Seek position is pointing just past the zero tuplen at the
1558 * end of file; back up to fetch last tuple's ending length
1559 * word. If seek fails we must have a completely empty file.
1560 */
1561 nmoved = LogicalTapeBackspace(state->result_tape,
1562 2 * sizeof(unsigned int));
1563 if (nmoved == 0)
1564 return false;
1565 else if (nmoved != 2 * sizeof(unsigned int))
1566 elog(ERROR, "unexpected tape position");
1567 state->eof_reached = false;
1568 }
1569 else
1570 {
1571 /*
1572 * Back up and fetch previously-returned tuple's ending length
1573 * word. If seek fails, assume we are at start of file.
1574 */
1575 nmoved = LogicalTapeBackspace(state->result_tape,
1576 sizeof(unsigned int));
1577 if (nmoved == 0)
1578 return false;
1579 else if (nmoved != sizeof(unsigned int))
1580 elog(ERROR, "unexpected tape position");
1581 tuplen = getlen(state->result_tape, false);
1582
1583 /*
1584 * Back up to get ending length word of tuple before it.
1585 */
1586 nmoved = LogicalTapeBackspace(state->result_tape,
1587 tuplen + 2 * sizeof(unsigned int));
1588 if (nmoved == tuplen + sizeof(unsigned int))
1589 {
1590 /*
1591 * We backed up over the previous tuple, but there was no
1592 * ending length word before it. That means that the prev
1593 * tuple is the first tuple in the file. It is now the
1594 * next to read in forward direction (not obviously right,
1595 * but that is what in-memory case does).
1596 */
1597 return false;
1598 }
1599 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1600 elog(ERROR, "bogus tuple length in backward scan");
1601 }
1602
1603 tuplen = getlen(state->result_tape, false);
1604
1605 /*
1606 * Now we have the length of the prior tuple, back up and read it.
1607 * Note: READTUP expects we are positioned after the initial
1608 * length word of the tuple, so back up to that point.
1609 */
1610 nmoved = LogicalTapeBackspace(state->result_tape,
1611 tuplen);
1612 if (nmoved != tuplen)
1613 elog(ERROR, "bogus tuple length in backward scan");
1614 READTUP(state, stup, state->result_tape, tuplen);
1615
1616 /*
1617 * Remember the tuple we return, so that we can recycle its memory
1618 * on next call. (This can be NULL, in the Datum case).
1619 */
1620 state->lastReturnedTuple = stup->tuple;
1621
1622 return true;
1623
1624 case TSS_FINALMERGE:
1625 Assert(forward);
1626 /* We are managing memory ourselves, with the slab allocator. */
1627 Assert(state->slabAllocatorUsed);
1628
1629 /*
1630 * The slab slot holding the tuple that we returned in previous
1631 * gettuple call can now be reused.
1632 */
1633 if (state->lastReturnedTuple)
1634 {
1635 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1636 state->lastReturnedTuple = NULL;
1637 }
1638
1639 /*
1640 * This code should match the inner loop of mergeonerun().
1641 */
1642 if (state->memtupcount > 0)
1643 {
1644 int srcTapeIndex = state->memtuples[0].srctape;
1645 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1647
1648 *stup = state->memtuples[0];
1649
1650 /*
1651 * Remember the tuple we return, so that we can recycle its
1652 * memory on next call. (This can be NULL, in the Datum case).
1653 */
1654 state->lastReturnedTuple = stup->tuple;
1655
1656 /*
1657 * Pull next tuple from tape, and replace the returned tuple
1658 * at top of the heap with it.
1659 */
1661 {
1662 /*
1663 * If no more data, we've reached end of run on this tape.
1664 * Remove the top node from the heap.
1665 */
1667 state->nInputRuns--;
1668
1669 /*
1670 * Close the tape. It'd go away at the end of the sort
1671 * anyway, but better to release the memory early.
1672 */
1674 return true;
1675 }
1676 newtup.srctape = srcTapeIndex;
1678 return true;
1679 }
1680 return false;
1681
1682 default:
1683 elog(ERROR, "invalid tuplesort state");
1684 return false; /* keep compiler quiet */
1685 }
1686}

References Assert, elog, ERROR, fb(), getlen(), LogicalTapeBackspace(), LogicalTapeClose(), mergereadnext(), READTUP, RELEASE_SLAB_SLOT, TSS_FINALMERGE, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, tuplesort_heap_delete_top(), tuplesort_heap_replace_top(), TUPLESORT_RANDOMACCESS, and WORKER.

Referenced by tuplesort_getbrintuple(), tuplesort_getdatum(), tuplesort_getgintuple(), tuplesort_getheaptuple(), tuplesort_getindextuple(), tuplesort_gettupleslot(), and tuplesort_skiptuples().

◆ tuplesort_heap_delete_top()

static void tuplesort_heap_delete_top ( Tuplesortstate state)
static

Definition at line 2757 of file tuplesort.c.

2758{
2759 SortTuple *memtuples = state->memtuples;
2760 SortTuple *tuple;
2761
2762 if (--state->memtupcount <= 0)
2763 return;
2764
2765 /*
2766 * Remove the last tuple in the heap, and re-insert it, by replacing the
2767 * current top node with it.
2768 */
2769 tuple = &memtuples[state->memtupcount];
2771}

References tuplesort_heap_replace_top().

Referenced by mergeonerun(), sort_bounded_heap(), and tuplesort_gettuple_common().

◆ tuplesort_heap_insert()

static void tuplesort_heap_insert ( Tuplesortstate state,
SortTuple tuple 
)
static

Definition at line 2722 of file tuplesort.c.

2723{
2724 SortTuple *memtuples;
2725 int j;
2726
2727 memtuples = state->memtuples;
2728 Assert(state->memtupcount < state->memtupsize);
2729
2731
2732 /*
2733 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2734 * using 1-based array indexes, not 0-based.
2735 */
2736 j = state->memtupcount++;
2737 while (j > 0)
2738 {
2739 int i = (j - 1) >> 1;
2740
2741 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2742 break;
2743 memtuples[j] = memtuples[i];
2744 j = i;
2745 }
2746 memtuples[j] = *tuple;
2747}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, i, and j.

Referenced by beginmerge(), and make_bounded_heap().

◆ tuplesort_heap_replace_top()

static void tuplesort_heap_replace_top ( Tuplesortstate state,
SortTuple tuple 
)
static

Definition at line 2781 of file tuplesort.c.

2782{
2783 SortTuple *memtuples = state->memtuples;
2784 unsigned int i,
2785 n;
2786
2787 Assert(state->memtupcount >= 1);
2788
2790
2791 /*
2792 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2793 * This prevents overflow in the "2 * i + 1" calculation, since at the top
2794 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2795 */
2796 n = state->memtupcount;
2797 i = 0; /* i is where the "hole" is */
2798 for (;;)
2799 {
2800 unsigned int j = 2 * i + 1;
2801
2802 if (j >= n)
2803 break;
2804 if (j + 1 < n &&
2805 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2806 j++;
2807 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2808 break;
2809 memtuples[i] = memtuples[j];
2810 i = j;
2811 }
2812 memtuples[i] = *tuple;
2813}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, i, and j.

Referenced by make_bounded_heap(), mergeonerun(), tuplesort_gettuple_common(), tuplesort_heap_delete_top(), and tuplesort_puttuple_common().

◆ tuplesort_initialize_shared()

void tuplesort_initialize_shared ( Sharedsort shared,
int  nWorkers,
dsm_segment seg 
)

Definition at line 2921 of file tuplesort.c.

2922{
2923 int i;
2924
2925 Assert(nWorkers > 0);
2926
2927 SpinLockInit(&shared->mutex);
2928 shared->currentWorker = 0;
2929 shared->workersFinished = 0;
2930 SharedFileSetInit(&shared->fileset, seg);
2931 shared->nTapes = nWorkers;
2932 for (i = 0; i < nWorkers; i++)
2933 {
2934 shared->tapes[i].firstblocknumber = 0L;
2935 }
2936}

References Assert, Sharedsort::currentWorker, fb(), Sharedsort::fileset, TapeShare::firstblocknumber, i, Sharedsort::mutex, Sharedsort::nTapes, SharedFileSetInit(), SpinLockInit, Sharedsort::tapes, and Sharedsort::workersFinished.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().

◆ tuplesort_markpos()

void tuplesort_markpos ( Tuplesortstate state)

Definition at line 2420 of file tuplesort.c.

2421{
2422 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2423
2424 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2425
2426 switch (state->status)
2427 {
2428 case TSS_SORTEDINMEM:
2429 state->markpos_offset = state->current;
2430 state->markpos_eof = state->eof_reached;
2431 break;
2432 case TSS_SORTEDONTAPE:
2433 LogicalTapeTell(state->result_tape,
2434 &state->markpos_block,
2435 &state->markpos_offset);
2436 state->markpos_eof = state->eof_reached;
2437 break;
2438 default:
2439 elog(ERROR, "invalid tuplesort state");
2440 break;
2441 }
2442
2443 MemoryContextSwitchTo(oldcontext);
2444}

References Assert, elog, ERROR, LogicalTapeTell(), MemoryContextSwitchTo(), TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and TUPLESORT_RANDOMACCESS.

Referenced by ExecSortMarkPos().

◆ tuplesort_merge_order()

int tuplesort_merge_order ( int64  allowedMem)

Definition at line 1763 of file tuplesort.c.

1764{
1765 int mOrder;
1766
1767 /*----------
1768 * In the merge phase, we need buffer space for each input and output tape.
1769 * Each pass in the balanced merge algorithm reads from M input tapes, and
1770 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1771 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1772 * input tape.
1773 *
1774 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1775 * N * TAPE_BUFFER_OVERHEAD
1776 *
1777 * Except for the last and next-to-last merge passes, where there can be
1778 * fewer tapes left to process, M = N. We choose M so that we have the
1779 * desired amount of memory available for the input buffers
1780 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1781 * available for the tape buffers (allowedMem).
1782 *
1783 * Note: you might be thinking we need to account for the memtuples[]
1784 * array in this calculation, but we effectively treat that as part of the
1785 * MERGE_BUFFER_SIZE workspace.
1786 *----------
1787 */
1788 mOrder = allowedMem /
1790
1791 /*
1792 * Even in minimum memory, use at least a MINORDER merge. On the other
1793 * hand, even when we have lots of memory, do not use more than a MAXORDER
1794 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1795 * additional tape reduces the amount of memory available to build runs,
1796 * which in turn can cause the same sort to need more runs, which makes
1797 * merging slower even if it can still be done in a single pass. Also,
1798 * high order merges are quite slow due to CPU cache effects; it can be
1799 * faster to pay the I/O cost of a multi-pass merge than to perform a
1800 * single merge pass across many hundreds of tapes.
1801 */
1804
1805 return mOrder;
1806}

References fb(), Max, MAXORDER, MERGE_BUFFER_SIZE, Min, MINORDER, and TAPE_BUFFER_OVERHEAD.

Referenced by cost_tuplesort(), and inittapes().

◆ tuplesort_method_name()

const char * tuplesort_method_name ( TuplesortMethod  m)

Definition at line 2528 of file tuplesort.c.

2529{
2530 switch (m)
2531 {
2533 return "still in progress";
2535 return "top-N heapsort";
2537 return "quicksort";
2539 return "external sort";
2541 return "external merge";
2542 }
2543
2544 return "unknown";
2545}

References SORT_TYPE_EXTERNAL_MERGE, SORT_TYPE_EXTERNAL_SORT, SORT_TYPE_QUICKSORT, SORT_TYPE_STILL_IN_PROGRESS, and SORT_TYPE_TOP_N_HEAPSORT.

Referenced by show_incremental_sort_group_info(), and show_sort_info().

◆ tuplesort_performsort()

void tuplesort_performsort ( Tuplesortstate state)

Definition at line 1348 of file tuplesort.c.

1349{
1350 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1351
1352 if (trace_sort)
1353 elog(LOG, "performsort of worker %d starting: %s",
1354 state->worker, pg_rusage_show(&state->ru_start));
1355
1356 switch (state->status)
1357 {
1358 case TSS_INITIAL:
1359
1360 /*
1361 * We were able to accumulate all the tuples within the allowed
1362 * amount of memory, or leader to take over worker tapes
1363 */
1364 if (SERIAL(state))
1365 {
1366 /* Just qsort 'em and we're done */
1368 state->status = TSS_SORTEDINMEM;
1369 }
1370 else if (WORKER(state))
1371 {
1372 /*
1373 * Parallel workers must still dump out tuples to tape. No
1374 * merge is required to produce single output run, though.
1375 */
1376 inittapes(state, false);
1377 dumptuples(state, true);
1379 state->status = TSS_SORTEDONTAPE;
1380 }
1381 else
1382 {
1383 /*
1384 * Leader will take over worker tapes and merge worker runs.
1385 * Note that mergeruns sets the correct state->status.
1386 */
1389 }
1390 state->current = 0;
1391 state->eof_reached = false;
1392 state->markpos_block = 0L;
1393 state->markpos_offset = 0;
1394 state->markpos_eof = false;
1395 break;
1396
1397 case TSS_BOUNDED:
1398
1399 /*
1400 * We were able to accumulate all the tuples required for output
1401 * in memory, using a heap to eliminate excess tuples. Now we
1402 * have to transform the heap to a properly-sorted array. Note
1403 * that sort_bounded_heap sets the correct state->status.
1404 */
1406 state->current = 0;
1407 state->eof_reached = false;
1408 state->markpos_offset = 0;
1409 state->markpos_eof = false;
1410 break;
1411
1412 case TSS_BUILDRUNS:
1413
1414 /*
1415 * Finish tape-based sort. First, flush all tuples remaining in
1416 * memory out to tape; then merge until we have a single remaining
1417 * run (or, if !randomAccess and !WORKER(), one run per tape).
1418 * Note that mergeruns sets the correct state->status.
1419 */
1420 dumptuples(state, true);
1422 state->eof_reached = false;
1423 state->markpos_block = 0L;
1424 state->markpos_offset = 0;
1425 state->markpos_eof = false;
1426 break;
1427
1428 default:
1429 elog(ERROR, "invalid tuplesort state");
1430 break;
1431 }
1432
1433 if (trace_sort)
1434 {
1435 if (state->status == TSS_FINALMERGE)
1436 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1437 state->worker, state->nInputTapes,
1438 pg_rusage_show(&state->ru_start));
1439 else
1440 elog(LOG, "performsort of worker %d done: %s",
1441 state->worker, pg_rusage_show(&state->ru_start));
1442 }
1443
1444 MemoryContextSwitchTo(oldcontext);
1445}

References dumptuples(), elog, ERROR, fb(), inittapes(), leader_takeover_tapes(), LOG, MemoryContextSwitchTo(), mergeruns(), pg_rusage_show(), SERIAL, sort_bounded_heap(), trace_sort, TSS_BOUNDED, TSS_BUILDRUNS, TSS_FINALMERGE, TSS_INITIAL, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, tuplesort_sort_memtuples(), WORKER, and worker_nomergeruns().

Referenced by _brin_parallel_merge(), _brin_parallel_scan_and_build(), _bt_leafbuild(), _bt_parallel_scan_and_sort(), _gin_parallel_merge(), _gin_parallel_scan_and_build(), _gin_process_worker_data(), _h_indexbuild(), array_sort_internal(), ExecIncrementalSort(), ExecSort(), gistbuild(), heapam_relation_copy_for_cluster(), hypothetical_dense_rank_final(), hypothetical_rank_common(), initialize_phase(), mode_final(), percentile_cont_final_common(), percentile_cont_multi_final_common(), percentile_disc_final(), percentile_disc_multi_final(), process_ordered_aggregate_multi(), process_ordered_aggregate_single(), switchToPresortedPrefixMode(), and validate_index().

◆ tuplesort_puttuple_common()

void tuplesort_puttuple_common ( Tuplesortstate state,
SortTuple tuple,
bool  useAbbrev,
Size  tuplen 
)

Definition at line 1154 of file tuplesort.c.

1156{
1157 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1158
1159 Assert(!LEADER(state));
1160
1161 /* account for the memory used for this tuple */
1162 USEMEM(state, tuplen);
1163 state->tupleMem += tuplen;
1164
1165 if (!useAbbrev)
1166 {
1167 /*
1168 * Leave ordinary Datum representation, or NULL value. If there is a
1169 * converter it won't expect NULL values, and cost model is not
1170 * required to account for NULL, so in that case we avoid calling
1171 * converter and just set datum1 to zeroed representation (to be
1172 * consistent, and to support cheap inequality tests for NULL
1173 * abbreviated keys).
1174 */
1175 }
1176 else if (!consider_abort_common(state))
1177 {
1178 /* Store abbreviated key representation */
1179 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1180 state->base.sortKeys);
1181 }
1182 else
1183 {
1184 /*
1185 * Set state to be consistent with never trying abbreviation.
1186 *
1187 * Alter datum1 representation in already-copied tuples, so as to
1188 * ensure a consistent representation (current tuple was just
1189 * handled). It does not matter if some dumped tuples are already
1190 * sorted on tape, since serialized tuples lack abbreviated keys
1191 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1192 */
1193 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1194 }
1195
1196 switch (state->status)
1197 {
1198 case TSS_INITIAL:
1199
1200 /*
1201 * Save the tuple into the unsorted array. First, grow the array
1202 * as needed. Note that we try to grow the array when there is
1203 * still one free slot remaining --- if we fail, there'll still be
1204 * room to store the incoming tuple, and then we'll switch to
1205 * tape-based operation.
1206 */
1207 if (state->memtupcount >= state->memtupsize - 1)
1208 {
1210 Assert(state->memtupcount < state->memtupsize);
1211 }
1212 state->memtuples[state->memtupcount++] = *tuple;
1213
1214 /*
1215 * Check if it's time to switch over to a bounded heapsort. We do
1216 * so if the input tuple count exceeds twice the desired tuple
1217 * count (this is a heuristic for where heapsort becomes cheaper
1218 * than a quicksort), or if we've just filled workMem and have
1219 * enough tuples to meet the bound.
1220 *
1221 * Note that once we enter TSS_BOUNDED state we will always try to
1222 * complete the sort that way. In the worst case, if later input
1223 * tuples are larger than earlier ones, this might cause us to
1224 * exceed workMem significantly.
1225 */
1226 if (state->bounded &&
1227 (state->memtupcount > state->bound * 2 ||
1228 (state->memtupcount > state->bound && LACKMEM(state))))
1229 {
1230 if (trace_sort)
1231 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1232 state->memtupcount,
1233 pg_rusage_show(&state->ru_start));
1235 MemoryContextSwitchTo(oldcontext);
1236 return;
1237 }
1238
1239 /*
1240 * Done if we still fit in available memory and have array slots.
1241 */
1242 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1243 {
1244 MemoryContextSwitchTo(oldcontext);
1245 return;
1246 }
1247
1248 /*
1249 * Nope; time to switch to tape-based operation.
1250 */
1251 inittapes(state, true);
1252
1253 /*
1254 * Dump all tuples.
1255 */
1256 dumptuples(state, false);
1257 break;
1258
1259 case TSS_BOUNDED:
1260
1261 /*
1262 * We don't want to grow the array here, so check whether the new
1263 * tuple can be discarded before putting it in. This should be a
1264 * good speed optimization, too, since when there are many more
1265 * input tuples than the bound, most input tuples can be discarded
1266 * with just this one comparison. Note that because we currently
1267 * have the sort direction reversed, we must check for <= not >=.
1268 */
1269 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1270 {
1271 /* new tuple <= top of the heap, so we can discard it */
1272 free_sort_tuple(state, tuple);
1274 }
1275 else
1276 {
1277 /* discard top of heap, replacing it with the new tuple */
1278 free_sort_tuple(state, &state->memtuples[0]);
1280 }
1281 break;
1282
1283 case TSS_BUILDRUNS:
1284
1285 /*
1286 * Save the tuple into the unsorted array (there must be space)
1287 */
1288 state->memtuples[state->memtupcount++] = *tuple;
1289
1290 /*
1291 * If we are over the memory limit, dump all tuples.
1292 */
1293 dumptuples(state, false);
1294 break;
1295
1296 default:
1297 elog(ERROR, "invalid tuplesort state");
1298 break;
1299 }
1300 MemoryContextSwitchTo(oldcontext);
1301}

References Assert, CHECK_FOR_INTERRUPTS, COMPARETUP, consider_abort_common(), SortTuple::datum1, dumptuples(), elog, ERROR, fb(), free_sort_tuple(), grow_memtuples(), inittapes(), LACKMEM, LEADER, LOG, make_bounded_heap(), MemoryContextSwitchTo(), pg_rusage_show(), REMOVEABBREV, trace_sort, TSS_BOUNDED, TSS_BUILDRUNS, TSS_INITIAL, tuplesort_heap_replace_top(), and USEMEM.

Referenced by tuplesort_putbrintuple(), tuplesort_putdatum(), tuplesort_putgintuple(), tuplesort_putheaptuple(), tuplesort_putindextuplevalues(), and tuplesort_puttupleslot().

◆ tuplesort_readtup_alloc()

void * tuplesort_readtup_alloc ( Tuplesortstate state,
Size  tuplen 
)

Definition at line 2866 of file tuplesort.c.

2867{
2868 SlabSlot *buf;
2869
2870 /*
2871 * We pre-allocate enough slots in the slab arena that we should never run
2872 * out.
2873 */
2874 Assert(state->slabFreeHead);
2875
2876 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2877 return MemoryContextAlloc(state->base.sortcontext, tuplen);
2878 else
2879 {
2880 buf = state->slabFreeHead;
2881 /* Reuse this slot */
2882 state->slabFreeHead = buf->nextfree;
2883
2884 return buf;
2885 }
2886}

References Assert, buf, MemoryContextAlloc(), and SLAB_SLOT_SIZE.

Referenced by readtup_cluster(), readtup_datum(), readtup_heap(), readtup_index(), readtup_index_brin(), and readtup_index_gin().

◆ tuplesort_rescan()

void tuplesort_rescan ( Tuplesortstate state)

Definition at line 2387 of file tuplesort.c.

2388{
2389 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2390
2391 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2392
2393 switch (state->status)
2394 {
2395 case TSS_SORTEDINMEM:
2396 state->current = 0;
2397 state->eof_reached = false;
2398 state->markpos_offset = 0;
2399 state->markpos_eof = false;
2400 break;
2401 case TSS_SORTEDONTAPE:
2402 LogicalTapeRewindForRead(state->result_tape, 0);
2403 state->eof_reached = false;
2404 state->markpos_block = 0L;
2405 state->markpos_offset = 0;
2406 state->markpos_eof = false;
2407 break;
2408 default:
2409 elog(ERROR, "invalid tuplesort state");
2410 break;
2411 }
2412
2413 MemoryContextSwitchTo(oldcontext);
2414}

References Assert, elog, ERROR, fb(), LogicalTapeRewindForRead(), MemoryContextSwitchTo(), TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and TUPLESORT_RANDOMACCESS.

Referenced by ExecReScanSort(), mode_final(), percentile_cont_final_common(), percentile_cont_multi_final_common(), percentile_disc_final(), and percentile_disc_multi_final().

◆ tuplesort_reset()

void tuplesort_reset ( Tuplesortstate state)

Definition at line 1004 of file tuplesort.c.

1005{
1008
1009 /*
1010 * After we've freed up per-batch memory, re-setup all of the state common
1011 * to both the first batch and any subsequent batch.
1012 */
1014
1015 state->lastReturnedTuple = NULL;
1016 state->slabMemoryBegin = NULL;
1017 state->slabMemoryEnd = NULL;
1018 state->slabFreeHead = NULL;
1019}

References fb(), tuplesort_begin_batch(), tuplesort_free(), and tuplesort_updatemax().

Referenced by ExecIncrementalSort(), ExecReScanIncrementalSort(), and switchToPresortedPrefixMode().

◆ tuplesort_restorepos()

void tuplesort_restorepos ( Tuplesortstate state)

Definition at line 2451 of file tuplesort.c.

2452{
2453 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2454
2455 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2456
2457 switch (state->status)
2458 {
2459 case TSS_SORTEDINMEM:
2460 state->current = state->markpos_offset;
2461 state->eof_reached = state->markpos_eof;
2462 break;
2463 case TSS_SORTEDONTAPE:
2464 LogicalTapeSeek(state->result_tape,
2465 state->markpos_block,
2466 state->markpos_offset);
2467 state->eof_reached = state->markpos_eof;
2468 break;
2469 default:
2470 elog(ERROR, "invalid tuplesort state");
2471 break;
2472 }
2473
2474 MemoryContextSwitchTo(oldcontext);
2475}

References Assert, elog, ERROR, LogicalTapeSeek(), MemoryContextSwitchTo(), TSS_SORTEDINMEM, TSS_SORTEDONTAPE, and TUPLESORT_RANDOMACCESS.

Referenced by ExecSortRestrPos().

◆ tuplesort_set_bound()

void tuplesort_set_bound ( Tuplesortstate state,
int64  bound 
)

Definition at line 823 of file tuplesort.c.

824{
825 /* Assert we're called before loading any tuples */
826 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
827 /* Assert we allow bounded sorts */
828 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
829 /* Can't set the bound twice, either */
830 Assert(!state->bounded);
831 /* Also, this shouldn't be called in a parallel worker */
833
834 /* Parallel leader allows but ignores hint */
835 if (LEADER(state))
836 return;
837
838#ifdef DEBUG_BOUNDED_SORT
839 /* Honor GUC setting that disables the feature (for easy testing) */
841 return;
842#endif
843
844 /* We want to be able to compute bound * 2, so limit the setting */
845 if (bound > (int64) (INT_MAX / 2))
846 return;
847
848 state->bounded = true;
849 state->bound = (int) bound;
850
851 /*
852 * Bounded sorts are not an effective target for abbreviated key
853 * optimization. Disable by setting state to be consistent with no
854 * abbreviation support.
855 */
856 state->base.sortKeys->abbrev_converter = NULL;
857 if (state->base.sortKeys->abbrev_full_comparator)
858 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
859
860 /* Not strictly necessary, but be tidy */
861 state->base.sortKeys->abbrev_abort = NULL;
862 state->base.sortKeys->abbrev_full_comparator = NULL;
863}

References Assert, fb(), LEADER, TSS_INITIAL, TUPLESORT_ALLOWBOUNDED, and WORKER.

Referenced by ExecIncrementalSort(), ExecSort(), and switchToPresortedPrefixMode().

◆ tuplesort_skiptuples()

bool tuplesort_skiptuples ( Tuplesortstate state,
int64  ntuples,
bool  forward 
)

Definition at line 1695 of file tuplesort.c.

1696{
1697 MemoryContext oldcontext;
1698
1699 /*
1700 * We don't actually support backwards skip yet, because no callers need
1701 * it. The API is designed to allow for that later, though.
1702 */
1703 Assert(forward);
1704 Assert(ntuples >= 0);
1705 Assert(!WORKER(state));
1706
1707 switch (state->status)
1708 {
1709 case TSS_SORTEDINMEM:
1710 if (state->memtupcount - state->current >= ntuples)
1711 {
1712 state->current += ntuples;
1713 return true;
1714 }
1715 state->current = state->memtupcount;
1716 state->eof_reached = true;
1717
1718 /*
1719 * Complain if caller tries to retrieve more tuples than
1720 * originally asked for in a bounded sort. This is because
1721 * returning EOF here might be the wrong thing.
1722 */
1723 if (state->bounded && state->current >= state->bound)
1724 elog(ERROR, "retrieved too many tuples in a bounded sort");
1725
1726 return false;
1727
1728 case TSS_SORTEDONTAPE:
1729 case TSS_FINALMERGE:
1730
1731 /*
1732 * We could probably optimize these cases better, but for now it's
1733 * not worth the trouble.
1734 */
1735 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1736 while (ntuples-- > 0)
1737 {
1739
1741 {
1742 MemoryContextSwitchTo(oldcontext);
1743 return false;
1744 }
1746 }
1747 MemoryContextSwitchTo(oldcontext);
1748 return true;
1749
1750 default:
1751 elog(ERROR, "invalid tuplesort state");
1752 return false; /* keep compiler quiet */
1753 }
1754}

References Assert, CHECK_FOR_INTERRUPTS, elog, ERROR, fb(), MemoryContextSwitchTo(), TSS_FINALMERGE, TSS_SORTEDINMEM, TSS_SORTEDONTAPE, tuplesort_gettuple_common(), and WORKER.

Referenced by percentile_cont_final_common(), percentile_cont_multi_final_common(), percentile_disc_final(), and percentile_disc_multi_final().

◆ tuplesort_sort_memtuples()

static void tuplesort_sort_memtuples ( Tuplesortstate state)
static

Definition at line 2661 of file tuplesort.c.

2662{
2663 Assert(!LEADER(state));
2664
2665 if (state->memtupcount > 1)
2666 {
2667 /*
2668 * Do we have the leading column's value or abbreviation in datum1,
2669 * and is there a specialization for its comparator?
2670 */
2671 if (state->base.haveDatum1 && state->base.sortKeys)
2672 {
2673 if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2674 {
2675 qsort_tuple_unsigned(state->memtuples,
2676 state->memtupcount,
2677 state);
2678 return;
2679 }
2680 else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2681 {
2682 qsort_tuple_signed(state->memtuples,
2683 state->memtupcount,
2684 state);
2685 return;
2686 }
2687 else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2688 {
2689 qsort_tuple_int32(state->memtuples,
2690 state->memtupcount,
2691 state);
2692 return;
2693 }
2694 }
2695
2696 /* Can we use the single-key sort function? */
2697 if (state->base.onlyKey != NULL)
2698 {
2699 qsort_ssup(state->memtuples, state->memtupcount,
2700 state->base.onlyKey);
2701 }
2702 else
2703 {
2704 qsort_tuple(state->memtuples,
2705 state->memtupcount,
2706 state->base.comparetup,
2707 state);
2708 }
2709 }
2710}

References Assert, fb(), LEADER, ssup_datum_int32_cmp(), ssup_datum_signed_cmp(), and ssup_datum_unsigned_cmp().

Referenced by dumptuples(), and tuplesort_performsort().

◆ tuplesort_space_type_name()

const char * tuplesort_space_type_name ( TuplesortSpaceType  t)

Definition at line 2551 of file tuplesort.c.

2552{
2554 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2555}

References Assert, SORT_SPACE_TYPE_DISK, and SORT_SPACE_TYPE_MEMORY.

Referenced by show_incremental_sort_group_info(), and show_sort_info().

◆ tuplesort_updatemax()

static void tuplesort_updatemax ( Tuplesortstate state)
static

Definition at line 953 of file tuplesort.c.

954{
955 int64 spaceUsed;
956 bool isSpaceDisk;
957
958 /*
959 * Note: it might seem we should provide both memory and disk usage for a
960 * disk-based sort. However, the current code doesn't track memory space
961 * accurately once we have begun to return tuples to the caller (since we
962 * don't account for pfree's the caller is expected to do), so we cannot
963 * rely on availMem in a disk sort. This does not seem worth the overhead
964 * to fix. Is it worth creating an API for the memory context code to
965 * tell us how much is actually used in sortcontext?
966 */
967 if (state->tapeset)
968 {
969 isSpaceDisk = true;
970 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
971 }
972 else
973 {
974 isSpaceDisk = false;
975 spaceUsed = state->allowedMem - state->availMem;
976 }
977
978 /*
979 * Sort evicts data to the disk when it wasn't able to fit that data into
980 * main memory. This is why we assume space used on the disk to be more
981 * important for tracking resource usage than space used in memory. Note
982 * that the amount of space occupied by some tupleset on the disk might be
983 * less than amount of space occupied by the same tupleset in memory due
984 * to more compact representation.
985 */
986 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
987 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
988 {
989 state->maxSpace = spaceUsed;
990 state->isMaxSpaceDisk = isSpaceDisk;
991 state->maxSpaceStatus = state->status;
992 }
993}

References fb(), and LogicalTapeSetBlocks().

Referenced by tuplesort_get_stats(), and tuplesort_reset().

◆ tuplesort_used_bound()

bool tuplesort_used_bound ( Tuplesortstate state)

Definition at line 871 of file tuplesort.c.

872{
873 return state->boundUsed;
874}

Referenced by ExecIncrementalSort().

◆ worker_freeze_result_tape()

static void worker_freeze_result_tape ( Tuplesortstate state)
static

Definition at line 2992 of file tuplesort.c.

2993{
2994 Sharedsort *shared = state->shared;
2996
2998 Assert(state->result_tape != NULL);
2999 Assert(state->memtupcount == 0);
3000
3001 /*
3002 * Free most remaining memory, in case caller is sensitive to our holding
3003 * on to it. memtuples may not be a tiny merge heap at this point.
3004 */
3005 pfree(state->memtuples);
3006 /* Be tidy */
3007 state->memtuples = NULL;
3008 state->memtupsize = 0;
3009
3010 /*
3011 * Parallel worker requires result tape metadata, which is to be stored in
3012 * shared memory for leader
3013 */
3014 LogicalTapeFreeze(state->result_tape, &output);
3015
3016 /* Store properties of output tape, and update finished worker count */
3017 SpinLockAcquire(&shared->mutex);
3018 shared->tapes[state->worker] = output;
3019 shared->workersFinished++;
3020 SpinLockRelease(&shared->mutex);
3021}

References Assert, fb(), LogicalTapeFreeze(), Sharedsort::mutex, output, pfree(), SpinLockAcquire, SpinLockRelease, Sharedsort::tapes, WORKER, and Sharedsort::workersFinished.

Referenced by mergeruns(), and worker_nomergeruns().

◆ worker_get_identifier()

static int worker_get_identifier ( Tuplesortstate state)
static

Definition at line 2964 of file tuplesort.c.

2965{
2966 Sharedsort *shared = state->shared;
2967 int worker;
2968
2970
2971 SpinLockAcquire(&shared->mutex);
2972 worker = shared->currentWorker++;
2973 SpinLockRelease(&shared->mutex);
2974
2975 return worker;
2976}

References Assert, Sharedsort::currentWorker, Sharedsort::mutex, SpinLockAcquire, SpinLockRelease, and WORKER.

Referenced by tuplesort_begin_common().

◆ worker_nomergeruns()

static void worker_nomergeruns ( Tuplesortstate state)
static

Definition at line 3030 of file tuplesort.c.

3031{
3033 Assert(state->result_tape == NULL);
3034 Assert(state->nOutputRuns == 1);
3035
3036 state->result_tape = state->destTape;
3038}

References Assert, fb(), WORKER, and worker_freeze_result_tape().

Referenced by tuplesort_performsort().

Variable Documentation

◆ trace_sort