PostgreSQL Source Code git master
Loading...
Searching...
No Matches
tuplesort.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * tuplesort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module provides a generalized facility for tuple sorting, which can be
7 * applied to different kinds of sortable objects. Implementation of
8 * the particular sorting variants is given in tuplesortvariants.c.
9 * This module works efficiently for both small and large amounts
10 * of data. Small amounts are sorted in-memory. Large amounts are
11 * sorted using temporary files and a standard external sort
12 * algorithm.
13 *
14 * See Knuth, volume 3, for more than you want to know about external
15 * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 * merge is better. Knuth is assuming that tape drives are expensive
19 * beasts, and in particular that there will always be many more runs than
20 * tape drives. The polyphase merge algorithm was good at keeping all the
21 * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 * much more than a few Kb of memory buffers, so we can afford to have
23 * lots of them. In particular, if we can have as many tape drives as
24 * sorted runs, we can eliminate any repeated I/O at all.
25 *
26 * Historically, we divided the input into sorted runs using replacement
27 * selection, in the form of a priority tree implemented as a heap
28 * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 * or radix sort for run generation.
30 *
31 * The approximate amount of memory allowed for any one sort operation
32 * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 * we absorb tuples and simply store them in an unsorted array as long as
34 * we haven't exceeded workMem. If we reach the end of the input without
35 * exceeding workMem, we sort the array in memory and subsequently return
36 * tuples just by scanning the tuple array sequentially. If we do exceed
37 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 * When tuples are dumped in batch after in-memory sorting, we begin a new run
39 * with a new output tape. If we reach the max number of tapes, we write
40 * subsequent runs on the existing tapes in a round-robin fashion. We will
41 * need multiple merge passes to finish the merge in that case. After the
42 * end of the input is reached, we dump out remaining tuples in memory into
43 * a final run, then merge the runs.
44 *
45 * When merging runs, we use a heap containing just the frontmost tuple from
46 * each source run; we repeatedly output the smallest tuple and replace it
47 * with the next tuple from its source tape (if any). When the heap empties,
48 * the merge is complete. The basic merge algorithm thus needs very little
49 * memory --- only M tuples for an M-way merge, and M is constrained to a
50 * small number. However, we can still make good use of our full workMem
51 * allocation by pre-reading additional blocks from each source tape. Without
52 * prereading, our access pattern to the temporary file would be very erratic;
53 * on average we'd read one block from each of M source tapes during the same
54 * time that we're writing M blocks to the output tape, so there is no
55 * sequentiality of access at all, defeating the read-ahead methods used by
56 * most Unix kernels. Worse, the output tape gets written into a very random
57 * sequence of blocks of the temp file, ensuring that things will be even
58 * worse when it comes time to read that tape. A straightforward merge pass
59 * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 * by prereading from each source tape sequentially, loading about workMem/M
61 * bytes from each tape in turn, and making the sequential blocks immediately
62 * available for reuse. This approach helps to localize both read and write
63 * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 * much memory to use for the buffers.
65 *
66 * In the current code we determine the number of input tapes M on the basis
67 * of workMem: we want workMem/M to be large enough that we read a fair
68 * amount of data each time we read from a tape, so as to maintain the
69 * locality of access described above. Nonetheless, with large workMem we
70 * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 * which avoids space wastage by recycling disk space as soon as each block
72 * is read from its "tape".
73 *
74 * When the caller requests random access to the sort result, we form
75 * the final sorted run on a logical tape which is then "frozen", so
76 * that we can access it randomly. When the caller does not need random
77 * access, we return from tuplesort_performsort() as soon as we are down
78 * to one run per logical tape. The final merge is then performed
79 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 * saves one cycle of writing all the data out to disk and reading it in.
81 *
82 * This module supports parallel sorting. Parallel sorts involve coordination
83 * among one or more worker processes, and a leader process, each with its own
84 * tuplesort state. The leader process (or, more accurately, the
85 * Tuplesortstate associated with a leader process) creates a full tapeset
86 * consisting of worker tapes with one run to merge; a run for every
87 * worker process. This is then merged. Worker processes are guaranteed to
88 * produce exactly one output run from their partial input.
89 *
90 *
91 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
92 * Portions Copyright (c) 1994, Regents of the University of California
93 *
94 * IDENTIFICATION
95 * src/backend/utils/sort/tuplesort.c
96 *
97 *-------------------------------------------------------------------------
98 */
99
100#include "postgres.h"
101
102#include <limits.h>
103
104#include "commands/tablespace.h"
105#include "miscadmin.h"
106#include "pg_trace.h"
107#include "port/pg_bitutils.h"
108#include "storage/shmem.h"
109#include "utils/guc.h"
110#include "utils/memutils.h"
111#include "utils/pg_rusage.h"
112#include "utils/tuplesort.h"
113
114/*
115 * Initial size of memtuples array. This must be more than
116 * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples(). Clamp at
117 * 1024 elements to avoid excessive reallocs.
118 */
119#define INITIAL_MEMTUPSIZE Max(1024, \
120 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
121
122/* GUC variables */
123bool trace_sort = false;
124
125#ifdef DEBUG_BOUNDED_SORT
126bool optimize_bounded_sort = true;
127#endif
128
129
130/*
131 * During merge, we use a pre-allocated set of fixed-size slots to hold
132 * tuples. To avoid palloc/pfree overhead.
133 *
134 * Merge doesn't require a lot of memory, so we can afford to waste some,
135 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
136 * palloc() overhead is not significant anymore.
137 *
138 * 'nextfree' is valid when this chunk is in the free list. When in use, the
139 * slot holds a tuple.
140 */
141#define SLAB_SLOT_SIZE 1024
142
148
149/*
150 * Possible states of a Tuplesort object. These denote the states that
151 * persist between calls of Tuplesort routines.
152 */
153typedef enum
154{
155 TSS_INITIAL, /* Loading tuples; still within memory limit */
156 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
157 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
158 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
159 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
160 TSS_FINALMERGE, /* Performing final merge on-the-fly */
162
163/*
164 * Parameters for calculation of number of tapes to use --- see inittapes()
165 * and tuplesort_merge_order().
166 *
167 * In this calculation we assume that each tape will cost us about 1 blocks
168 * worth of buffer space. This ignores the overhead of all the other data
169 * structures needed for each tape, but it's probably close enough.
170 *
171 * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
172 * input tape, for pre-reading (see discussion at top of file). This is *in
173 * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
174 */
175#define MINORDER 6 /* minimum merge order */
176#define MAXORDER 500 /* maximum merge order */
177#define TAPE_BUFFER_OVERHEAD BLCKSZ
178#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
179
180
181/*
182 * Private state of a Tuplesort operation.
183 */
185{
187 TupSortStatus status; /* enumerated value as shown above */
188 bool bounded; /* did caller specify a maximum number of
189 * tuples to return? */
190 bool boundUsed; /* true if we made use of a bounded heap */
191 int bound; /* if bounded, the maximum number of tuples */
192 int64 tupleMem; /* memory consumed by individual tuples.
193 * storing this separately from what we track
194 * in availMem allows us to subtract the
195 * memory consumed by all tuples when dumping
196 * tuples to tape */
197 int64 availMem; /* remaining memory available, in bytes */
198 int64 allowedMem; /* total memory allowed, in bytes */
199 int maxTapes; /* max number of input tapes to merge in each
200 * pass */
201 int64 maxSpace; /* maximum amount of space occupied among sort
202 * of groups, either in-memory or on-disk */
203 bool isMaxSpaceDisk; /* true when maxSpace tracks on-disk space,
204 * false means in-memory */
205 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
206 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
207
208 /*
209 * This array holds the tuples now in sort memory. If we are in state
210 * INITIAL, the tuples are in no particular order; if we are in state
211 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
212 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
213 * H. In state SORTEDONTAPE, the array is not used.
214 */
215 SortTuple *memtuples; /* array of SortTuple structs */
216 int memtupcount; /* number of tuples currently present */
217 int memtupsize; /* allocated length of memtuples array */
218 bool growmemtuples; /* memtuples' growth still underway? */
219
220 /*
221 * Memory for tuples is sometimes allocated using a simple slab allocator,
222 * rather than with palloc(). Currently, we switch to slab allocation
223 * when we start merging. Merging only needs to keep a small, fixed
224 * number of tuples in memory at any time, so we can avoid the
225 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
226 * to hold the tuples.
227 *
228 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
229 * slots. The allocation is sized to have one slot per tape, plus one
230 * additional slot. We need that many slots to hold all the tuples kept
231 * in the heap during merge, plus the one we have last returned from the
232 * sort, with tuplesort_gettuple.
233 *
234 * Initially, all the slots are kept in a linked list of free slots. When
235 * a tuple is read from a tape, it is put to the next available slot, if
236 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
237 * instead.
238 *
239 * When we're done processing a tuple, we return the slot back to the free
240 * list, or pfree() if it was palloc'd. We know that a tuple was
241 * allocated from the slab, if its pointer value is between
242 * slabMemoryBegin and -End.
243 *
244 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
245 * tracking memory usage is not used.
246 */
248
249 char *slabMemoryBegin; /* beginning of slab memory arena */
250 char *slabMemoryEnd; /* end of slab memory arena */
251 SlabSlot *slabFreeHead; /* head of free list */
252
253 /* Memory used for input and output tape buffers. */
255
256 /*
257 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
258 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
259 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
260 * recycle the memory on next gettuple call.
261 */
263
264 /*
265 * While building initial runs, this is the current output run number.
266 * Afterwards, it is the number of initial runs we made.
267 */
269
270 /*
271 * Logical tapes, for merging.
272 *
273 * The initial runs are written in the output tapes. In each merge pass,
274 * the output tapes of the previous pass become the input tapes, and new
275 * output tapes are created as needed. When nInputTapes equals
276 * nInputRuns, there is only one merge pass left.
277 */
281
285
286 LogicalTape *destTape; /* current output tape */
287
288 /*
289 * These variables are used after completion of sorting to keep track of
290 * the next tuple to return. (In the tape case, the tape's current read
291 * position is also critical state.)
292 */
293 LogicalTape *result_tape; /* actual tape of finished output */
294 int current; /* array index (only used if SORTEDINMEM) */
295 bool eof_reached; /* reached EOF (needed for cursors) */
296
297 /* markpos_xxx holds marked position for mark and restore */
298 int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
299 int markpos_offset; /* saved "current", or offset in tape block */
300 bool markpos_eof; /* saved "eof_reached" */
301
302 /*
303 * These variables are used during parallel sorting.
304 *
305 * worker is our worker identifier. Follows the general convention that
306 * -1 value relates to a leader tuplesort, and values >= 0 worker
307 * tuplesorts. (-1 can also be a serial tuplesort.)
308 *
309 * shared is mutable shared memory state, which is used to coordinate
310 * parallel sorts.
311 *
312 * nParticipants is the number of worker Tuplesortstates known by the
313 * leader to have actually been launched, which implies that they must
314 * finish a run that the leader needs to merge. Typically includes a
315 * worker state held by the leader process itself. Set in the leader
316 * Tuplesortstate only.
317 */
321
322 /*
323 * Additional state for managing "abbreviated key" sortsupport routines
324 * (which currently may be used by all cases except the hash index case).
325 * Tracks the intervals at which the optimization's effectiveness is
326 * tested.
327 */
328 int64 abbrevNext; /* Tuple # at which to next check
329 * applicability */
330
331 /*
332 * Resource snapshot for time of sort start.
333 */
335};
336
337/*
338 * Private mutable state of tuplesort-parallel-operation. This is allocated
339 * in shared memory.
340 */
342{
343 /* mutex protects all fields prior to tapes */
345
346 /*
347 * currentWorker generates ordinal identifier numbers for parallel sort
348 * workers. These start from 0, and are always gapless.
349 *
350 * Workers increment workersFinished to indicate having finished. If this
351 * is equal to state.nParticipants within the leader, leader is ready to
352 * merge worker runs.
353 */
356
357 /* Temporary file space */
359
360 /* Size of tapes flexible array */
362
363 /*
364 * Tapes array used by workers to report back information needed by the
365 * leader to concatenate all worker tapes into one for merging
366 */
368};
369
370/*
371 * Is the given tuple allocated from the slab memory arena?
372 */
373#define IS_SLAB_SLOT(state, tuple) \
374 ((char *) (tuple) >= (state)->slabMemoryBegin && \
375 (char *) (tuple) < (state)->slabMemoryEnd)
376
377/*
378 * Return the given tuple to the slab memory free list, or free it
379 * if it was palloc'd.
380 */
381#define RELEASE_SLAB_SLOT(state, tuple) \
382 do { \
383 SlabSlot *buf = (SlabSlot *) tuple; \
384 \
385 if (IS_SLAB_SLOT((state), buf)) \
386 { \
387 buf->nextfree = (state)->slabFreeHead; \
388 (state)->slabFreeHead = buf; \
389 } else \
390 pfree(buf); \
391 } while(0)
392
393#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
394#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
395#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
396#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
397#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
398#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
399#define USEMEM(state,amt) ((state)->availMem -= (amt))
400#define FREEMEM(state,amt) ((state)->availMem += (amt))
401#define SERIAL(state) ((state)->shared == NULL)
402#define WORKER(state) ((state)->shared && (state)->worker != -1)
403#define LEADER(state) ((state)->shared && (state)->worker == -1)
404
405/*
406 * NOTES about on-tape representation of tuples:
407 *
408 * We require the first "unsigned int" of a stored tuple to be the total size
409 * on-tape of the tuple, including itself (so it is never zero; an all-zero
410 * unsigned int is used to delimit runs). The remainder of the stored tuple
411 * may or may not match the in-memory representation of the tuple ---
412 * any conversion needed is the job of the writetup and readtup routines.
413 *
414 * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
415 * representation of the tuple must be followed by another "unsigned int" that
416 * is a copy of the length --- so the total tape space used is actually
417 * sizeof(unsigned int) more than the stored length value. This allows
418 * read-backwards. When the random access flag was not specified, the
419 * write/read routines may omit the extra length word.
420 *
421 * writetup is expected to write both length words as well as the tuple
422 * data. When readtup is called, the tape is positioned just after the
423 * front length word; readtup must read the tuple data and advance past
424 * the back length word (if present).
425 *
426 * The write/read routines can make use of the tuple description data
427 * stored in the Tuplesortstate record, if needed. They are also expected
428 * to adjust state->availMem by the amount of memory space (not tape space!)
429 * released or consumed. There is no error return from either writetup
430 * or readtup; they should ereport() on failure.
431 *
432 *
433 * NOTES about memory consumption calculations:
434 *
435 * We count space allocated for tuples against the workMem limit, plus
436 * the space used by the variable-size memtuples array. Fixed-size space
437 * is not counted; it's small enough to not be interesting.
438 *
439 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
440 * rather than the originally-requested size. This is important since
441 * palloc can add substantial overhead. It's not a complete answer since
442 * we won't count any wasted space in palloc allocation blocks, but it's
443 * a lot better than what we were doing before 7.3. As of 9.6, a
444 * separate memory context is used for caller passed tuples. Resetting
445 * it at certain key increments significantly ameliorates fragmentation.
446 * readtup routines use the slab allocator (they cannot use
447 * the reset context because it gets deleted at the point that merging
448 * begins).
449 */
450
451
454static void inittapes(Tuplesortstate *state, bool mergeruns);
455static void inittapestate(Tuplesortstate *state, int maxTapes);
458static void mergeruns(Tuplesortstate *state);
459static void mergeonerun(Tuplesortstate *state);
460static void beginmerge(Tuplesortstate *state);
462static void dumptuples(Tuplesortstate *state, bool alltuples);
470static unsigned int getlen(LogicalTape *tape, bool eofOK);
471static void markrunend(LogicalTape *tape);
479
480
481/*
482 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
483 * any variant of SortTuples, using the appropriate comparetup function.
484 * qsort_ssup() is specialized for the case where the comparetup function
485 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
486 * and Datum sorts.
487 */
488
489#define ST_SORT qsort_tuple
490#define ST_ELEMENT_TYPE SortTuple
491#define ST_COMPARE_RUNTIME_POINTER
492#define ST_COMPARE_ARG_TYPE Tuplesortstate
493#define ST_CHECK_FOR_INTERRUPTS
494#define ST_SCOPE static
495#define ST_DECLARE
496#define ST_DEFINE
497#include "lib/sort_template.h"
498
499#define ST_SORT qsort_ssup
500#define ST_ELEMENT_TYPE SortTuple
501#define ST_COMPARE(a, b, ssup) \
502 ApplySortComparator((a)->datum1, (a)->isnull1, \
503 (b)->datum1, (b)->isnull1, (ssup))
504#define ST_COMPARE_ARG_TYPE SortSupportData
505#define ST_CHECK_FOR_INTERRUPTS
506#define ST_SCOPE static
507#define ST_DEFINE
508#include "lib/sort_template.h"
509
510/* state for radix sort */
511typedef struct RadixSortInfo
512{
513 union
514 {
515 size_t count;
516 size_t offset;
517 };
520
521/*
522 * Threshold below which qsort_tuple() is generally faster than a radix sort.
523 */
524#define QSORT_THRESHOLD 40
525
526
527/*
528 * tuplesort_begin_xxx
529 *
530 * Initialize for a tuple sort operation.
531 *
532 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
533 * zero or more times, then call tuplesort_performsort when all the tuples
534 * have been supplied. After performsort, retrieve the tuples in sorted
535 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
536 * access was requested, rescan, markpos, and restorepos can also be called.)
537 * Call tuplesort_end to terminate the operation and release memory/disk space.
538 *
539 * Each variant of tuplesort_begin has a workMem parameter specifying the
540 * maximum number of kilobytes of RAM to use before spilling data to disk.
541 * (The normal value of this parameter is work_mem, but some callers use
542 * other values.) Each variant also has a sortopt which is a bitmask of
543 * sort options. See TUPLESORT_* definitions in tuplesort.h
544 */
545
548{
550 MemoryContext maincontext;
551 MemoryContext sortcontext;
552 MemoryContext oldcontext;
553
554 /* See leader_takeover_tapes() remarks on random access support */
555 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
556 elog(ERROR, "random access disallowed under parallel sort");
557
558 /*
559 * Memory context surviving tuplesort_reset. This memory context holds
560 * data which is useful to keep while sorting multiple similar batches.
561 */
563 "TupleSort main",
565
566 /*
567 * Create a working memory context for one sort operation. The content of
568 * this context is deleted by tuplesort_reset.
569 */
570 sortcontext = AllocSetContextCreate(maincontext,
571 "TupleSort sort",
573
574 /*
575 * Additionally a working memory context for tuples is setup in
576 * tuplesort_begin_batch.
577 */
578
579 /*
580 * Make the Tuplesortstate within the per-sortstate context. This way, we
581 * don't need a separate pfree() operation for it at shutdown.
582 */
583 oldcontext = MemoryContextSwitchTo(maincontext);
584
586
587 if (trace_sort)
588 pg_rusage_init(&state->ru_start);
589
590 state->base.sortopt = sortopt;
591 state->base.tuples = true;
592 state->abbrevNext = 10;
593
594 /*
595 * workMem is forced to be at least 64KB, the current minimum valid value
596 * for the work_mem GUC. This is a defense against parallel sort callers
597 * that divide out memory among many workers in a way that leaves each
598 * with very little memory.
599 */
600 state->allowedMem = Max(workMem, 64) * (int64) 1024;
601 state->base.sortcontext = sortcontext;
602 state->base.maincontext = maincontext;
603
604 state->memtupsize = INITIAL_MEMTUPSIZE;
605 state->memtuples = NULL;
606
607 /*
608 * After all of the other non-parallel-related state, we setup all of the
609 * state needed for each batch.
610 */
612
613 /*
614 * Initialize parallel-related state based on coordination information
615 * from caller
616 */
617 if (!coordinate)
618 {
619 /* Serial sort */
620 state->shared = NULL;
621 state->worker = -1;
622 state->nParticipants = -1;
623 }
624 else if (coordinate->isWorker)
625 {
626 /* Parallel worker produces exactly one final run from all input */
627 state->shared = coordinate->sharedsort;
629 state->nParticipants = -1;
630 }
631 else
632 {
633 /* Parallel leader state only used for final merge */
634 state->shared = coordinate->sharedsort;
635 state->worker = -1;
636 state->nParticipants = coordinate->nParticipants;
637 Assert(state->nParticipants >= 1);
638 }
639
640 MemoryContextSwitchTo(oldcontext);
641
642 return state;
643}
644
645/*
646 * tuplesort_begin_batch
647 *
648 * Setup, or reset, all state need for processing a new set of tuples with this
649 * sort state. Called both from tuplesort_begin_common (the first time sorting
650 * with this sort state) and tuplesort_reset (for subsequent usages).
651 */
652static void
654{
655 MemoryContext oldcontext;
656
657 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
658
659 /*
660 * Caller tuple (e.g. IndexTuple) memory context.
661 *
662 * A dedicated child context used exclusively for caller passed tuples
663 * eases memory management. Resetting at key points reduces
664 * fragmentation. Note that the memtuples array of SortTuples is allocated
665 * in the parent context, not this context, because there is no need to
666 * free memtuples early. For bounded sorts, tuples may be pfreed in any
667 * order, so we use a regular aset.c context so that it can make use of
668 * free'd memory. When the sort is not bounded, we make use of a bump.c
669 * context as this keeps allocations more compact with less wastage.
670 * Allocations are also slightly more CPU efficient.
671 */
672 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
673 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
674 "Caller tuples",
676 else
677 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
678 "Caller tuples",
680
681
682 state->status = TSS_INITIAL;
683 state->bounded = false;
684 state->boundUsed = false;
685
686 state->availMem = state->allowedMem;
687
688 state->tapeset = NULL;
689
690 state->memtupcount = 0;
691
692 state->growmemtuples = true;
693 state->slabAllocatorUsed = false;
694 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
695 {
696 pfree(state->memtuples);
697 state->memtuples = NULL;
698 state->memtupsize = INITIAL_MEMTUPSIZE;
699 }
700 if (state->memtuples == NULL)
701 {
702 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
704 }
705
706 /* workMem must be large enough for the minimal memtuples array */
707 if (LACKMEM(state))
708 elog(ERROR, "insufficient memory allowed for sort");
709
710 state->currentRun = 0;
711
712 /*
713 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
714 * inittapes(), if needed.
715 */
716
717 state->result_tape = NULL; /* flag that result tape has not been formed */
718
719 MemoryContextSwitchTo(oldcontext);
720}
721
722/*
723 * tuplesort_set_bound
724 *
725 * Advise tuplesort that at most the first N result tuples are required.
726 *
727 * Must be called before inserting any tuples. (Actually, we could allow it
728 * as long as the sort hasn't spilled to disk, but there seems no need for
729 * delayed calls at the moment.)
730 *
731 * This is a hint only. The tuplesort may still return more tuples than
732 * requested. Parallel leader tuplesorts will always ignore the hint.
733 */
734void
736{
737 /* Assert we're called before loading any tuples */
738 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
739 /* Assert we allow bounded sorts */
740 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
741 /* Can't set the bound twice, either */
742 Assert(!state->bounded);
743 /* Also, this shouldn't be called in a parallel worker */
745
746 /* Parallel leader allows but ignores hint */
747 if (LEADER(state))
748 return;
749
750#ifdef DEBUG_BOUNDED_SORT
751 /* Honor GUC setting that disables the feature (for easy testing) */
753 return;
754#endif
755
756 /* We want to be able to compute bound * 2, so limit the setting */
757 if (bound > (int64) (INT_MAX / 2))
758 return;
759
760 state->bounded = true;
761 state->bound = (int) bound;
762
763 /*
764 * Bounded sorts are not an effective target for abbreviated key
765 * optimization. Disable by setting state to be consistent with no
766 * abbreviation support.
767 */
768 state->base.sortKeys->abbrev_converter = NULL;
769 if (state->base.sortKeys->abbrev_full_comparator)
770 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
771
772 /* Not strictly necessary, but be tidy */
773 state->base.sortKeys->abbrev_abort = NULL;
774 state->base.sortKeys->abbrev_full_comparator = NULL;
775}
776
777/*
778 * tuplesort_used_bound
779 *
780 * Allow callers to find out if the sort state was able to use a bound.
781 */
782bool
784{
785 return state->boundUsed;
786}
787
788/*
789 * tuplesort_free
790 *
791 * Internal routine for freeing resources of tuplesort.
792 */
793static void
795{
796 /* context swap probably not needed, but let's be safe */
797 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
798 int64 spaceUsed;
799
800 if (state->tapeset)
801 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
802 else
803 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
804
805 /*
806 * Delete temporary "tape" files, if any.
807 *
808 * We don't bother to destroy the individual tapes here. They will go away
809 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
810 * finished tapes already.)
811 */
812 if (state->tapeset)
813 LogicalTapeSetClose(state->tapeset);
814
815 if (trace_sort)
816 {
817 if (state->tapeset)
818 elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
819 SERIAL(state) ? "external sort" : "parallel external sort",
820 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
821 else
822 elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
823 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
824 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
825 }
826
827 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
828
830 MemoryContextSwitchTo(oldcontext);
831
832 /*
833 * Free the per-sort memory context, thereby releasing all working memory.
834 */
835 MemoryContextReset(state->base.sortcontext);
836}
837
838/*
839 * tuplesort_end
840 *
841 * Release resources and clean up.
842 *
843 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
844 * pointing to garbage. Be careful not to attempt to use or free such
845 * pointers afterwards!
846 */
847void
849{
851
852 /*
853 * Free the main memory context, including the Tuplesortstate struct
854 * itself.
855 */
856 MemoryContextDelete(state->base.maincontext);
857}
858
859/*
860 * tuplesort_updatemax
861 *
862 * Update maximum resource usage statistics.
863 */
864static void
866{
867 int64 spaceUsed;
868 bool isSpaceDisk;
869
870 /*
871 * Note: it might seem we should provide both memory and disk usage for a
872 * disk-based sort. However, the current code doesn't track memory space
873 * accurately once we have begun to return tuples to the caller (since we
874 * don't account for pfree's the caller is expected to do), so we cannot
875 * rely on availMem in a disk sort. This does not seem worth the overhead
876 * to fix. Is it worth creating an API for the memory context code to
877 * tell us how much is actually used in sortcontext?
878 */
879 if (state->tapeset)
880 {
881 isSpaceDisk = true;
882 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
883 }
884 else
885 {
886 isSpaceDisk = false;
887 spaceUsed = state->allowedMem - state->availMem;
888 }
889
890 /*
891 * Sort evicts data to the disk when it wasn't able to fit that data into
892 * main memory. This is why we assume space used on the disk to be more
893 * important for tracking resource usage than space used in memory. Note
894 * that the amount of space occupied by some tupleset on the disk might be
895 * less than amount of space occupied by the same tupleset in memory due
896 * to more compact representation.
897 */
898 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
899 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
900 {
901 state->maxSpace = spaceUsed;
902 state->isMaxSpaceDisk = isSpaceDisk;
903 state->maxSpaceStatus = state->status;
904 }
905}
906
907/*
908 * tuplesort_reset
909 *
910 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
911 * meta-information in. After tuplesort_reset, tuplesort is ready to start
912 * a new sort. This allows avoiding recreation of tuple sort states (and
913 * save resources) when sorting multiple small batches.
914 */
915void
917{
920
921 /*
922 * After we've freed up per-batch memory, re-setup all of the state common
923 * to both the first batch and any subsequent batch.
924 */
926
927 state->lastReturnedTuple = NULL;
928 state->slabMemoryBegin = NULL;
929 state->slabMemoryEnd = NULL;
930 state->slabFreeHead = NULL;
931}
932
933/*
934 * Grow the memtuples[] array, if possible within our memory constraint. We
935 * must not exceed INT_MAX tuples in memory or the caller-provided memory
936 * limit. Return true if we were able to enlarge the array, false if not.
937 *
938 * Normally, at each increment we double the size of the array. When doing
939 * that would exceed a limit, we attempt one last, smaller increase (and then
940 * clear the growmemtuples flag so we don't try any more). That allows us to
941 * use memory as fully as permitted; sticking to the pure doubling rule could
942 * result in almost half going unused. Because availMem moves around with
943 * tuple addition/removal, we need some rule to prevent making repeated small
944 * increases in memtupsize, which would just be useless thrashing. The
945 * growmemtuples flag accomplishes that and also prevents useless
946 * recalculations in this function.
947 */
948static bool
950{
951 int newmemtupsize;
952 int memtupsize = state->memtupsize;
953 int64 memNowUsed = state->allowedMem - state->availMem;
954
955 /* Forget it if we've already maxed out memtuples, per comment above */
956 if (!state->growmemtuples)
957 return false;
958
959 /* Select new value of memtupsize */
960 if (memNowUsed <= state->availMem)
961 {
962 /*
963 * We've used no more than half of allowedMem; double our usage,
964 * clamping at INT_MAX tuples.
965 */
966 if (memtupsize < INT_MAX / 2)
967 newmemtupsize = memtupsize * 2;
968 else
969 {
971 state->growmemtuples = false;
972 }
973 }
974 else
975 {
976 /*
977 * This will be the last increment of memtupsize. Abandon doubling
978 * strategy and instead increase as much as we safely can.
979 *
980 * To stay within allowedMem, we can't increase memtupsize by more
981 * than availMem / sizeof(SortTuple) elements. In practice, we want
982 * to increase it by considerably less, because we need to leave some
983 * space for the tuples to which the new array slots will refer. We
984 * assume the new tuples will be about the same size as the tuples
985 * we've already seen, and thus we can extrapolate from the space
986 * consumption so far to estimate an appropriate new size for the
987 * memtuples array. The optimal value might be higher or lower than
988 * this estimate, but it's hard to know that in advance. We again
989 * clamp at INT_MAX tuples.
990 *
991 * This calculation is safe against enlarging the array so much that
992 * LACKMEM becomes true, because the memory currently used includes
993 * the present array; thus, there would be enough allowedMem for the
994 * new array elements even if no other memory were currently used.
995 *
996 * We do the arithmetic in float8, because otherwise the product of
997 * memtupsize and allowedMem could overflow. Any inaccuracy in the
998 * result should be insignificant; but even if we computed a
999 * completely insane result, the checks below will prevent anything
1000 * really bad from happening.
1001 */
1002 double grow_ratio;
1003
1004 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1005 if (memtupsize * grow_ratio < INT_MAX)
1006 newmemtupsize = (int) (memtupsize * grow_ratio);
1007 else
1009
1010 /* We won't make any further enlargement attempts */
1011 state->growmemtuples = false;
1012 }
1013
1014 /* Must enlarge array by at least one element, else report failure */
1015 if (newmemtupsize <= memtupsize)
1016 goto noalloc;
1017
1018 /*
1019 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1020 * to ensure our request won't be rejected. Note that we can easily
1021 * exhaust address space before facing this outcome. (This is presently
1022 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1023 * don't rely on that at this distance.)
1024 */
1025 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1026 {
1028 state->growmemtuples = false; /* can't grow any more */
1029 }
1030
1031 /*
1032 * We need to be sure that we do not cause LACKMEM to become true, else
1033 * the space management algorithm will go nuts. The code above should
1034 * never generate a dangerous request, but to be safe, check explicitly
1035 * that the array growth fits within availMem. (We could still cause
1036 * LACKMEM if the memory chunk overhead associated with the memtuples
1037 * array were to increase. That shouldn't happen because we chose the
1038 * initial array size large enough to ensure that palloc will be treating
1039 * both old and new arrays as separate chunks. But we'll check LACKMEM
1040 * explicitly below just in case.)
1041 */
1042 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1043 goto noalloc;
1044
1045 /* OK, do it */
1046 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1047 state->memtupsize = newmemtupsize;
1048 state->memtuples = (SortTuple *)
1049 repalloc_huge(state->memtuples,
1050 state->memtupsize * sizeof(SortTuple));
1051 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1052 if (LACKMEM(state))
1053 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1054 return true;
1055
1056noalloc:
1057 /* If for any reason we didn't realloc, shut off future attempts */
1058 state->growmemtuples = false;
1059 return false;
1060}
1061
1062/*
1063 * Shared code for tuple and datum cases.
1064 */
1065void
1067 bool useAbbrev, Size tuplen)
1068{
1069 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1070
1071 Assert(!LEADER(state));
1072
1073 /* account for the memory used for this tuple */
1074 USEMEM(state, tuplen);
1075 state->tupleMem += tuplen;
1076
1077 if (!useAbbrev)
1078 {
1079 /*
1080 * Leave ordinary Datum representation, or NULL value. If there is a
1081 * converter it won't expect NULL values, and cost model is not
1082 * required to account for NULL, so in that case we avoid calling
1083 * converter and just set datum1 to zeroed representation (to be
1084 * consistent, and to support cheap inequality tests for NULL
1085 * abbreviated keys).
1086 */
1087 }
1088 else if (!consider_abort_common(state))
1089 {
1090 /* Store abbreviated key representation */
1091 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1092 state->base.sortKeys);
1093 }
1094 else
1095 {
1096 /*
1097 * Set state to be consistent with never trying abbreviation.
1098 *
1099 * Alter datum1 representation in already-copied tuples, so as to
1100 * ensure a consistent representation (current tuple was just
1101 * handled). It does not matter if some dumped tuples are already
1102 * sorted on tape, since serialized tuples lack abbreviated keys
1103 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1104 */
1105 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1106 }
1107
1108 switch (state->status)
1109 {
1110 case TSS_INITIAL:
1111
1112 /*
1113 * Save the tuple into the unsorted array. First, grow the array
1114 * as needed. Note that we try to grow the array when there is
1115 * still one free slot remaining --- if we fail, there'll still be
1116 * room to store the incoming tuple, and then we'll switch to
1117 * tape-based operation.
1118 */
1119 if (state->memtupcount >= state->memtupsize - 1)
1120 {
1122 Assert(state->memtupcount < state->memtupsize);
1123 }
1124 state->memtuples[state->memtupcount++] = *tuple;
1125
1126 /*
1127 * Check if it's time to switch over to a bounded heapsort. We do
1128 * so if the input tuple count exceeds twice the desired tuple
1129 * count (this is a heuristic for where heapsort becomes cheaper
1130 * than a quicksort), or if we've just filled workMem and have
1131 * enough tuples to meet the bound.
1132 *
1133 * Note that once we enter TSS_BOUNDED state we will always try to
1134 * complete the sort that way. In the worst case, if later input
1135 * tuples are larger than earlier ones, this might cause us to
1136 * exceed workMem significantly.
1137 */
1138 if (state->bounded &&
1139 (state->memtupcount > state->bound * 2 ||
1140 (state->memtupcount > state->bound && LACKMEM(state))))
1141 {
1142 if (trace_sort)
1143 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1144 state->memtupcount,
1145 pg_rusage_show(&state->ru_start));
1147 MemoryContextSwitchTo(oldcontext);
1148 return;
1149 }
1150
1151 /*
1152 * Done if we still fit in available memory and have array slots.
1153 */
1154 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1155 {
1156 MemoryContextSwitchTo(oldcontext);
1157 return;
1158 }
1159
1160 /*
1161 * Nope; time to switch to tape-based operation.
1162 */
1163 inittapes(state, true);
1164
1165 /*
1166 * Dump all tuples.
1167 */
1168 dumptuples(state, false);
1169 break;
1170
1171 case TSS_BOUNDED:
1172
1173 /*
1174 * We don't want to grow the array here, so check whether the new
1175 * tuple can be discarded before putting it in. This should be a
1176 * good speed optimization, too, since when there are many more
1177 * input tuples than the bound, most input tuples can be discarded
1178 * with just this one comparison. Note that because we currently
1179 * have the sort direction reversed, we must check for <= not >=.
1180 */
1181 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1182 {
1183 /* new tuple <= top of the heap, so we can discard it */
1184 free_sort_tuple(state, tuple);
1186 }
1187 else
1188 {
1189 /* discard top of heap, replacing it with the new tuple */
1190 free_sort_tuple(state, &state->memtuples[0]);
1192 }
1193 break;
1194
1195 case TSS_BUILDRUNS:
1196
1197 /*
1198 * Save the tuple into the unsorted array (there must be space)
1199 */
1200 state->memtuples[state->memtupcount++] = *tuple;
1201
1202 /*
1203 * If we are over the memory limit, dump all tuples.
1204 */
1205 dumptuples(state, false);
1206 break;
1207
1208 default:
1209 elog(ERROR, "invalid tuplesort state");
1210 break;
1211 }
1212 MemoryContextSwitchTo(oldcontext);
1213}
1214
1215static bool
1217{
1218 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1219 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1220 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1221
1222 /*
1223 * Check effectiveness of abbreviation optimization. Consider aborting
1224 * when still within memory limit.
1225 */
1226 if (state->status == TSS_INITIAL &&
1227 state->memtupcount >= state->abbrevNext)
1228 {
1229 state->abbrevNext *= 2;
1230
1231 /*
1232 * Check opclass-supplied abbreviation abort routine. It may indicate
1233 * that abbreviation should not proceed.
1234 */
1235 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1236 state->base.sortKeys))
1237 return false;
1238
1239 /*
1240 * Finally, restore authoritative comparator, and indicate that
1241 * abbreviation is not in play by setting abbrev_converter to NULL
1242 */
1243 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1244 state->base.sortKeys[0].abbrev_converter = NULL;
1245 /* Not strictly necessary, but be tidy */
1246 state->base.sortKeys[0].abbrev_abort = NULL;
1247 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1248
1249 /* Give up - expect original pass-by-value representation */
1250 return true;
1251 }
1252
1253 return false;
1254}
1255
1256/*
1257 * All tuples have been provided; finish the sort.
1258 */
1259void
1261{
1262 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1263
1264 if (trace_sort)
1265 elog(LOG, "performsort of worker %d starting: %s",
1266 state->worker, pg_rusage_show(&state->ru_start));
1267
1268 switch (state->status)
1269 {
1270 case TSS_INITIAL:
1271
1272 /*
1273 * We were able to accumulate all the tuples within the allowed
1274 * amount of memory, or leader to take over worker tapes
1275 */
1276 if (SERIAL(state))
1277 {
1278 /* Sort in memory and we're done */
1280 state->status = TSS_SORTEDINMEM;
1281 }
1282 else if (WORKER(state))
1283 {
1284 /*
1285 * Parallel workers must still dump out tuples to tape. No
1286 * merge is required to produce single output run, though.
1287 */
1288 inittapes(state, false);
1289 dumptuples(state, true);
1291 state->status = TSS_SORTEDONTAPE;
1292 }
1293 else
1294 {
1295 /*
1296 * Leader will take over worker tapes and merge worker runs.
1297 * Note that mergeruns sets the correct state->status.
1298 */
1301 }
1302 state->current = 0;
1303 state->eof_reached = false;
1304 state->markpos_block = 0L;
1305 state->markpos_offset = 0;
1306 state->markpos_eof = false;
1307 break;
1308
1309 case TSS_BOUNDED:
1310
1311 /*
1312 * We were able to accumulate all the tuples required for output
1313 * in memory, using a heap to eliminate excess tuples. Now we
1314 * have to transform the heap to a properly-sorted array. Note
1315 * that sort_bounded_heap sets the correct state->status.
1316 */
1318 state->current = 0;
1319 state->eof_reached = false;
1320 state->markpos_offset = 0;
1321 state->markpos_eof = false;
1322 break;
1323
1324 case TSS_BUILDRUNS:
1325
1326 /*
1327 * Finish tape-based sort. First, flush all tuples remaining in
1328 * memory out to tape; then merge until we have a single remaining
1329 * run (or, if !randomAccess and !WORKER(), one run per tape).
1330 * Note that mergeruns sets the correct state->status.
1331 */
1332 dumptuples(state, true);
1334 state->eof_reached = false;
1335 state->markpos_block = 0L;
1336 state->markpos_offset = 0;
1337 state->markpos_eof = false;
1338 break;
1339
1340 default:
1341 elog(ERROR, "invalid tuplesort state");
1342 break;
1343 }
1344
1345 if (trace_sort)
1346 {
1347 if (state->status == TSS_FINALMERGE)
1348 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1349 state->worker, state->nInputTapes,
1350 pg_rusage_show(&state->ru_start));
1351 else
1352 elog(LOG, "performsort of worker %d done: %s",
1353 state->worker, pg_rusage_show(&state->ru_start));
1354 }
1355
1356 MemoryContextSwitchTo(oldcontext);
1357}
1358
1359/*
1360 * Internal routine to fetch the next tuple in either forward or back
1361 * direction into *stup. Returns false if no more tuples.
1362 * Returned tuple belongs to tuplesort memory context, and must not be freed
1363 * by caller. Note that fetched tuple is stored in memory that may be
1364 * recycled by any future fetch.
1365 */
1366bool
1368 SortTuple *stup)
1369{
1370 unsigned int tuplen;
1371 size_t nmoved;
1372
1373 Assert(!WORKER(state));
1374
1375 switch (state->status)
1376 {
1377 case TSS_SORTEDINMEM:
1378 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1379 Assert(!state->slabAllocatorUsed);
1380 if (forward)
1381 {
1382 if (state->current < state->memtupcount)
1383 {
1384 *stup = state->memtuples[state->current++];
1385 return true;
1386 }
1387 state->eof_reached = true;
1388
1389 /*
1390 * Complain if caller tries to retrieve more tuples than
1391 * originally asked for in a bounded sort. This is because
1392 * returning EOF here might be the wrong thing.
1393 */
1394 if (state->bounded && state->current >= state->bound)
1395 elog(ERROR, "retrieved too many tuples in a bounded sort");
1396
1397 return false;
1398 }
1399 else
1400 {
1401 if (state->current <= 0)
1402 return false;
1403
1404 /*
1405 * if all tuples are fetched already then we return last
1406 * tuple, else - tuple before last returned.
1407 */
1408 if (state->eof_reached)
1409 state->eof_reached = false;
1410 else
1411 {
1412 state->current--; /* last returned tuple */
1413 if (state->current <= 0)
1414 return false;
1415 }
1416 *stup = state->memtuples[state->current - 1];
1417 return true;
1418 }
1419 break;
1420
1421 case TSS_SORTEDONTAPE:
1422 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1423 Assert(state->slabAllocatorUsed);
1424
1425 /*
1426 * The slot that held the tuple that we returned in previous
1427 * gettuple call can now be reused.
1428 */
1429 if (state->lastReturnedTuple)
1430 {
1431 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1432 state->lastReturnedTuple = NULL;
1433 }
1434
1435 if (forward)
1436 {
1437 if (state->eof_reached)
1438 return false;
1439
1440 if ((tuplen = getlen(state->result_tape, true)) != 0)
1441 {
1442 READTUP(state, stup, state->result_tape, tuplen);
1443
1444 /*
1445 * Remember the tuple we return, so that we can recycle
1446 * its memory on next call. (This can be NULL, in the
1447 * !state->tuples case).
1448 */
1449 state->lastReturnedTuple = stup->tuple;
1450
1451 return true;
1452 }
1453 else
1454 {
1455 state->eof_reached = true;
1456 return false;
1457 }
1458 }
1459
1460 /*
1461 * Backward.
1462 *
1463 * if all tuples are fetched already then we return last tuple,
1464 * else - tuple before last returned.
1465 */
1466 if (state->eof_reached)
1467 {
1468 /*
1469 * Seek position is pointing just past the zero tuplen at the
1470 * end of file; back up to fetch last tuple's ending length
1471 * word. If seek fails we must have a completely empty file.
1472 */
1473 nmoved = LogicalTapeBackspace(state->result_tape,
1474 2 * sizeof(unsigned int));
1475 if (nmoved == 0)
1476 return false;
1477 else if (nmoved != 2 * sizeof(unsigned int))
1478 elog(ERROR, "unexpected tape position");
1479 state->eof_reached = false;
1480 }
1481 else
1482 {
1483 /*
1484 * Back up and fetch previously-returned tuple's ending length
1485 * word. If seek fails, assume we are at start of file.
1486 */
1487 nmoved = LogicalTapeBackspace(state->result_tape,
1488 sizeof(unsigned int));
1489 if (nmoved == 0)
1490 return false;
1491 else if (nmoved != sizeof(unsigned int))
1492 elog(ERROR, "unexpected tape position");
1493 tuplen = getlen(state->result_tape, false);
1494
1495 /*
1496 * Back up to get ending length word of tuple before it.
1497 */
1498 nmoved = LogicalTapeBackspace(state->result_tape,
1499 tuplen + 2 * sizeof(unsigned int));
1500 if (nmoved == tuplen + sizeof(unsigned int))
1501 {
1502 /*
1503 * We backed up over the previous tuple, but there was no
1504 * ending length word before it. That means that the prev
1505 * tuple is the first tuple in the file. It is now the
1506 * next to read in forward direction (not obviously right,
1507 * but that is what in-memory case does).
1508 */
1509 return false;
1510 }
1511 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1512 elog(ERROR, "bogus tuple length in backward scan");
1513 }
1514
1515 tuplen = getlen(state->result_tape, false);
1516
1517 /*
1518 * Now we have the length of the prior tuple, back up and read it.
1519 * Note: READTUP expects we are positioned after the initial
1520 * length word of the tuple, so back up to that point.
1521 */
1522 nmoved = LogicalTapeBackspace(state->result_tape,
1523 tuplen);
1524 if (nmoved != tuplen)
1525 elog(ERROR, "bogus tuple length in backward scan");
1526 READTUP(state, stup, state->result_tape, tuplen);
1527
1528 /*
1529 * Remember the tuple we return, so that we can recycle its memory
1530 * on next call. (This can be NULL, in the Datum case).
1531 */
1532 state->lastReturnedTuple = stup->tuple;
1533
1534 return true;
1535
1536 case TSS_FINALMERGE:
1537 Assert(forward);
1538 /* We are managing memory ourselves, with the slab allocator. */
1539 Assert(state->slabAllocatorUsed);
1540
1541 /*
1542 * The slab slot holding the tuple that we returned in previous
1543 * gettuple call can now be reused.
1544 */
1545 if (state->lastReturnedTuple)
1546 {
1547 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1548 state->lastReturnedTuple = NULL;
1549 }
1550
1551 /*
1552 * This code should match the inner loop of mergeonerun().
1553 */
1554 if (state->memtupcount > 0)
1555 {
1556 int srcTapeIndex = state->memtuples[0].srctape;
1557 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1559
1560 *stup = state->memtuples[0];
1561
1562 /*
1563 * Remember the tuple we return, so that we can recycle its
1564 * memory on next call. (This can be NULL, in the Datum case).
1565 */
1566 state->lastReturnedTuple = stup->tuple;
1567
1568 /*
1569 * Pull next tuple from tape, and replace the returned tuple
1570 * at top of the heap with it.
1571 */
1573 {
1574 /*
1575 * If no more data, we've reached end of run on this tape.
1576 * Remove the top node from the heap.
1577 */
1579 state->nInputRuns--;
1580
1581 /*
1582 * Close the tape. It'd go away at the end of the sort
1583 * anyway, but better to release the memory early.
1584 */
1586 return true;
1587 }
1588 newtup.srctape = srcTapeIndex;
1590 return true;
1591 }
1592 return false;
1593
1594 default:
1595 elog(ERROR, "invalid tuplesort state");
1596 return false; /* keep compiler quiet */
1597 }
1598}
1599
1600
1601/*
1602 * Advance over N tuples in either forward or back direction,
1603 * without returning any data. N==0 is a no-op.
1604 * Returns true if successful, false if ran out of tuples.
1605 */
1606bool
1608{
1609 MemoryContext oldcontext;
1610
1611 /*
1612 * We don't actually support backwards skip yet, because no callers need
1613 * it. The API is designed to allow for that later, though.
1614 */
1615 Assert(forward);
1616 Assert(ntuples >= 0);
1617 Assert(!WORKER(state));
1618
1619 switch (state->status)
1620 {
1621 case TSS_SORTEDINMEM:
1622 if (state->memtupcount - state->current >= ntuples)
1623 {
1624 state->current += ntuples;
1625 return true;
1626 }
1627 state->current = state->memtupcount;
1628 state->eof_reached = true;
1629
1630 /*
1631 * Complain if caller tries to retrieve more tuples than
1632 * originally asked for in a bounded sort. This is because
1633 * returning EOF here might be the wrong thing.
1634 */
1635 if (state->bounded && state->current >= state->bound)
1636 elog(ERROR, "retrieved too many tuples in a bounded sort");
1637
1638 return false;
1639
1640 case TSS_SORTEDONTAPE:
1641 case TSS_FINALMERGE:
1642
1643 /*
1644 * We could probably optimize these cases better, but for now it's
1645 * not worth the trouble.
1646 */
1647 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1648 while (ntuples-- > 0)
1649 {
1651
1653 {
1654 MemoryContextSwitchTo(oldcontext);
1655 return false;
1656 }
1658 }
1659 MemoryContextSwitchTo(oldcontext);
1660 return true;
1661
1662 default:
1663 elog(ERROR, "invalid tuplesort state");
1664 return false; /* keep compiler quiet */
1665 }
1666}
1667
1668/*
1669 * tuplesort_merge_order - report merge order we'll use for given memory
1670 * (note: "merge order" just means the number of input tapes in the merge).
1671 *
1672 * This is exported for use by the planner. allowedMem is in bytes.
1673 */
1674int
1676{
1677 int mOrder;
1678
1679 /*----------
1680 * In the merge phase, we need buffer space for each input and output tape.
1681 * Each pass in the balanced merge algorithm reads from M input tapes, and
1682 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1683 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1684 * input tape.
1685 *
1686 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1687 * N * TAPE_BUFFER_OVERHEAD
1688 *
1689 * Except for the last and next-to-last merge passes, where there can be
1690 * fewer tapes left to process, M = N. We choose M so that we have the
1691 * desired amount of memory available for the input buffers
1692 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1693 * available for the tape buffers (allowedMem).
1694 *
1695 * Note: you might be thinking we need to account for the memtuples[]
1696 * array in this calculation, but we effectively treat that as part of the
1697 * MERGE_BUFFER_SIZE workspace.
1698 *----------
1699 */
1700 mOrder = allowedMem /
1702
1703 /*
1704 * Even in minimum memory, use at least a MINORDER merge. On the other
1705 * hand, even when we have lots of memory, do not use more than a MAXORDER
1706 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1707 * additional tape reduces the amount of memory available to build runs,
1708 * which in turn can cause the same sort to need more runs, which makes
1709 * merging slower even if it can still be done in a single pass. Also,
1710 * high order merges are quite slow due to CPU cache effects; it can be
1711 * faster to pay the I/O cost of a multi-pass merge than to perform a
1712 * single merge pass across many hundreds of tapes.
1713 */
1716
1717 return mOrder;
1718}
1719
1720/*
1721 * Helper function to calculate how much memory to allocate for the read buffer
1722 * of each input tape in a merge pass.
1723 *
1724 * 'avail_mem' is the amount of memory available for the buffers of all the
1725 * tapes, both input and output.
1726 * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1727 * 'maxOutputTapes' is the max. number of output tapes we should produce.
1728 */
1729static int64
1730merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1731 int maxOutputTapes)
1732{
1733 int nOutputRuns;
1734 int nOutputTapes;
1735
1736 /*
1737 * How many output tapes will we produce in this pass?
1738 *
1739 * This is nInputRuns / nInputTapes, rounded up.
1740 */
1741 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1742
1743 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1744
1745 /*
1746 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1747 * remaining memory is divided evenly between the input tapes.
1748 *
1749 * This also follows from the formula in tuplesort_merge_order, but here
1750 * we derive the input buffer size from the amount of memory available,
1751 * and M and N.
1752 */
1753 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1754}
1755
1756/*
1757 * inittapes - initialize for tape sorting.
1758 *
1759 * This is called only if we have found we won't sort in memory.
1760 */
1761static void
1763{
1764 Assert(!LEADER(state));
1765
1766 if (mergeruns)
1767 {
1768 /* Compute number of input tapes to use when merging */
1769 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1770 }
1771 else
1772 {
1773 /* Workers can sometimes produce single run, output without merge */
1775 state->maxTapes = MINORDER;
1776 }
1777
1778 if (trace_sort)
1779 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1780 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1781
1782 /* Create the tape set */
1783 inittapestate(state, state->maxTapes);
1784 state->tapeset =
1786 state->shared ? &state->shared->fileset : NULL,
1787 state->worker);
1788
1789 state->currentRun = 0;
1790
1791 /*
1792 * Initialize logical tape arrays.
1793 */
1794 state->inputTapes = NULL;
1795 state->nInputTapes = 0;
1796 state->nInputRuns = 0;
1797
1798 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1799 state->nOutputTapes = 0;
1800 state->nOutputRuns = 0;
1801
1802 state->status = TSS_BUILDRUNS;
1803
1805}
1806
1807/*
1808 * inittapestate - initialize generic tape management state
1809 */
1810static void
1812{
1814
1815 /*
1816 * Decrease availMem to reflect the space needed for tape buffers; but
1817 * don't decrease it to the point that we have no room for tuples. (That
1818 * case is only likely to occur if sorting pass-by-value Datums; in all
1819 * other scenarios the memtuples[] array is unlikely to occupy more than
1820 * half of allowedMem. In the pass-by-value case it's not important to
1821 * account for tuple space, so we don't care if LACKMEM becomes
1822 * inaccurate.)
1823 */
1824 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1825
1826 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1828
1829 /*
1830 * Make sure that the temp file(s) underlying the tape set are created in
1831 * suitable temp tablespaces. For parallel sorts, this should have been
1832 * called already, but it doesn't matter if it is called a second time.
1833 */
1835}
1836
1837/*
1838 * selectnewtape -- select next tape to output to.
1839 *
1840 * This is called after finishing a run when we know another run
1841 * must be started. This is used both when building the initial
1842 * runs, and during merge passes.
1843 */
1844static void
1846{
1847 /*
1848 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1849 * both zero. On each call, we create a new output tape to hold the next
1850 * run, until maxTapes is reached. After that, we assign new runs to the
1851 * existing tapes in a round robin fashion.
1852 */
1853 if (state->nOutputTapes < state->maxTapes)
1854 {
1855 /* Create a new tape to hold the next run */
1856 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1857 Assert(state->nOutputRuns == state->nOutputTapes);
1858 state->destTape = LogicalTapeCreate(state->tapeset);
1859 state->outputTapes[state->nOutputTapes] = state->destTape;
1860 state->nOutputTapes++;
1861 state->nOutputRuns++;
1862 }
1863 else
1864 {
1865 /*
1866 * We have reached the max number of tapes. Append to an existing
1867 * tape.
1868 */
1869 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1870 state->nOutputRuns++;
1871 }
1872}
1873
1874/*
1875 * Initialize the slab allocation arena, for the given number of slots.
1876 */
1877static void
1879{
1880 if (numSlots > 0)
1881 {
1882 char *p;
1883 int i;
1884
1885 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1886 state->slabMemoryEnd = state->slabMemoryBegin +
1888 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1890
1891 p = state->slabMemoryBegin;
1892 for (i = 0; i < numSlots - 1; i++)
1893 {
1894 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1895 p += SLAB_SLOT_SIZE;
1896 }
1897 ((SlabSlot *) p)->nextfree = NULL;
1898 }
1899 else
1900 {
1901 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1902 state->slabFreeHead = NULL;
1903 }
1904 state->slabAllocatorUsed = true;
1905}
1906
1907/*
1908 * mergeruns -- merge all the completed initial runs.
1909 *
1910 * This implements the Balanced k-Way Merge Algorithm. All input data has
1911 * already been written to initial runs on tape (see dumptuples).
1912 */
1913static void
1915{
1916 int tapenum;
1917
1918 Assert(state->status == TSS_BUILDRUNS);
1919 Assert(state->memtupcount == 0);
1920
1921 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
1922 {
1923 /*
1924 * If there are multiple runs to be merged, when we go to read back
1925 * tuples from disk, abbreviated keys will not have been stored, and
1926 * we don't care to regenerate them. Disable abbreviation from this
1927 * point on.
1928 */
1929 state->base.sortKeys->abbrev_converter = NULL;
1930 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
1931
1932 /* Not strictly necessary, but be tidy */
1933 state->base.sortKeys->abbrev_abort = NULL;
1934 state->base.sortKeys->abbrev_full_comparator = NULL;
1935 }
1936
1937 /*
1938 * Reset tuple memory. We've freed all the tuples that we previously
1939 * allocated. We will use the slab allocator from now on.
1940 */
1941 MemoryContextResetOnly(state->base.tuplecontext);
1942
1943 /*
1944 * We no longer need a large memtuples array. (We will allocate a smaller
1945 * one for the heap later.)
1946 */
1947 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1948 pfree(state->memtuples);
1949 state->memtuples = NULL;
1950
1951 /*
1952 * Initialize the slab allocator. We need one slab slot per input tape,
1953 * for the tuples in the heap, plus one to hold the tuple last returned
1954 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
1955 * however, we don't need to do allocate anything.)
1956 *
1957 * In a multi-pass merge, we could shrink this allocation for the last
1958 * merge pass, if it has fewer tapes than previous passes, but we don't
1959 * bother.
1960 *
1961 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
1962 * to track memory usage of individual tuples.
1963 */
1964 if (state->base.tuples)
1965 init_slab_allocator(state, state->nOutputTapes + 1);
1966 else
1968
1969 /*
1970 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
1971 * from each input tape.
1972 *
1973 * We could shrink this, too, between passes in a multi-pass merge, but we
1974 * don't bother. (The initial input tapes are still in outputTapes. The
1975 * number of input tapes will not increase between passes.)
1976 */
1977 state->memtupsize = state->nOutputTapes;
1978 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
1979 state->nOutputTapes * sizeof(SortTuple));
1980 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1981
1982 /*
1983 * Use all the remaining memory we have available for tape buffers among
1984 * all the input tapes. At the beginning of each merge pass, we will
1985 * divide this memory between the input and output tapes in the pass.
1986 */
1987 state->tape_buffer_mem = state->availMem;
1988 USEMEM(state, state->tape_buffer_mem);
1989 if (trace_sort)
1990 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
1991 state->worker, state->tape_buffer_mem / 1024);
1992
1993 for (;;)
1994 {
1995 /*
1996 * On the first iteration, or if we have read all the runs from the
1997 * input tapes in a multi-pass merge, it's time to start a new pass.
1998 * Rewind all the output tapes, and make them inputs for the next
1999 * pass.
2000 */
2001 if (state->nInputRuns == 0)
2002 {
2004
2005 /* Close the old, emptied, input tapes */
2006 if (state->nInputTapes > 0)
2007 {
2008 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2009 LogicalTapeClose(state->inputTapes[tapenum]);
2010 pfree(state->inputTapes);
2011 }
2012
2013 /* Previous pass's outputs become next pass's inputs. */
2014 state->inputTapes = state->outputTapes;
2015 state->nInputTapes = state->nOutputTapes;
2016 state->nInputRuns = state->nOutputRuns;
2017
2018 /*
2019 * Reset output tape variables. The actual LogicalTapes will be
2020 * created as needed, here we only allocate the array to hold
2021 * them.
2022 */
2023 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2024 state->nOutputTapes = 0;
2025 state->nOutputRuns = 0;
2026
2027 /*
2028 * Redistribute the memory allocated for tape buffers, among the
2029 * new input and output tapes.
2030 */
2032 state->nInputTapes,
2033 state->nInputRuns,
2034 state->maxTapes);
2035
2036 if (trace_sort)
2037 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2038 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2039 pg_rusage_show(&state->ru_start));
2040
2041 /* Prepare the new input tapes for merge pass. */
2042 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2044
2045 /*
2046 * If there's just one run left on each input tape, then only one
2047 * merge pass remains. If we don't have to produce a materialized
2048 * sorted tape, we can stop at this point and do the final merge
2049 * on-the-fly.
2050 */
2051 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2052 && state->nInputRuns <= state->nInputTapes
2053 && !WORKER(state))
2054 {
2055 /* Tell logtape.c we won't be writing anymore */
2057 /* Initialize for the final merge pass */
2059 state->status = TSS_FINALMERGE;
2060 return;
2061 }
2062 }
2063
2064 /* Select an output tape */
2066
2067 /* Merge one run from each input tape. */
2069
2070 /*
2071 * If the input tapes are empty, and we output only one output run,
2072 * we're done. The current output tape contains the final result.
2073 */
2074 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2075 break;
2076 }
2077
2078 /*
2079 * Done. The result is on a single run on a single tape.
2080 */
2081 state->result_tape = state->outputTapes[0];
2082 if (!WORKER(state))
2083 LogicalTapeFreeze(state->result_tape, NULL);
2084 else
2086 state->status = TSS_SORTEDONTAPE;
2087
2088 /* Close all the now-empty input tapes, to release their read buffers. */
2089 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2090 LogicalTapeClose(state->inputTapes[tapenum]);
2091}
2092
2093/*
2094 * Merge one run from each input tape.
2095 */
2096static void
2098{
2099 int srcTapeIndex;
2101
2102 /*
2103 * Start the merge by loading one tuple from each active source tape into
2104 * the heap.
2105 */
2107
2108 Assert(state->slabAllocatorUsed);
2109
2110 /*
2111 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2112 * out, and replacing it with next tuple from same tape (if there is
2113 * another one).
2114 */
2115 while (state->memtupcount > 0)
2116 {
2118
2119 /* write the tuple to destTape */
2120 srcTapeIndex = state->memtuples[0].srctape;
2121 srcTape = state->inputTapes[srcTapeIndex];
2122 WRITETUP(state, state->destTape, &state->memtuples[0]);
2123
2124 /* recycle the slot of the tuple we just wrote out, for the next read */
2125 if (state->memtuples[0].tuple)
2126 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2127
2128 /*
2129 * pull next tuple from the tape, and replace the written-out tuple in
2130 * the heap with it.
2131 */
2133 {
2134 stup.srctape = srcTapeIndex;
2136 }
2137 else
2138 {
2140 state->nInputRuns--;
2141 }
2142 }
2143
2144 /*
2145 * When the heap empties, we're done. Write an end-of-run marker on the
2146 * output tape.
2147 */
2148 markrunend(state->destTape);
2149}
2150
2151/*
2152 * beginmerge - initialize for a merge pass
2153 *
2154 * Fill the merge heap with the first tuple from each input tape.
2155 */
2156static void
2158{
2159 int activeTapes;
2160 int srcTapeIndex;
2161
2162 /* Heap should be empty here */
2163 Assert(state->memtupcount == 0);
2164
2165 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2166
2168 {
2169 SortTuple tup;
2170
2171 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2172 {
2175 }
2176 }
2177}
2178
2179/*
2180 * mergereadnext - read next tuple from one merge input tape
2181 *
2182 * Returns false on EOF.
2183 */
2184static bool
2186{
2187 unsigned int tuplen;
2188
2189 /* read next tuple, if any */
2190 if ((tuplen = getlen(srcTape, true)) == 0)
2191 return false;
2192 READTUP(state, stup, srcTape, tuplen);
2193
2194 return true;
2195}
2196
2197/*
2198 * dumptuples - remove tuples from memtuples and write initial run to tape
2199 *
2200 * When alltuples = true, dump everything currently in memory. (This case is
2201 * only used at end of input data.)
2202 */
2203static void
2205{
2206 int memtupwrite;
2207 int i;
2208
2209 /*
2210 * Nothing to do if we still fit in available memory and have array slots,
2211 * unless this is the final call during initial run generation.
2212 */
2213 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2214 !alltuples)
2215 return;
2216
2217 /*
2218 * Final call might require no sorting, in rare cases where we just so
2219 * happen to have previously LACKMEM()'d at the point where exactly all
2220 * remaining tuples are loaded into memory, just before input was
2221 * exhausted. In general, short final runs are quite possible, but avoid
2222 * creating a completely empty run. In a worker, though, we must produce
2223 * at least one tape, even if it's empty.
2224 */
2225 if (state->memtupcount == 0 && state->currentRun > 0)
2226 return;
2227
2228 Assert(state->status == TSS_BUILDRUNS);
2229
2230 /*
2231 * It seems unlikely that this limit will ever be exceeded, but take no
2232 * chances
2233 */
2234 if (state->currentRun == INT_MAX)
2235 ereport(ERROR,
2237 errmsg("cannot have more than %d runs for an external sort",
2238 INT_MAX)));
2239
2240 if (state->currentRun > 0)
2242
2243 state->currentRun++;
2244
2245 if (trace_sort)
2246 elog(LOG, "worker %d starting quicksort of run %d: %s",
2247 state->worker, state->currentRun,
2248 pg_rusage_show(&state->ru_start));
2249
2250 /*
2251 * Sort all tuples accumulated within the allowed amount of memory for
2252 * this run.
2253 */
2255
2256 if (trace_sort)
2257 elog(LOG, "worker %d finished quicksort of run %d: %s",
2258 state->worker, state->currentRun,
2259 pg_rusage_show(&state->ru_start));
2260
2261 memtupwrite = state->memtupcount;
2262 for (i = 0; i < memtupwrite; i++)
2263 {
2264 SortTuple *stup = &state->memtuples[i];
2265
2266 WRITETUP(state, state->destTape, stup);
2267 }
2268
2269 state->memtupcount = 0;
2270
2271 /*
2272 * Reset tuple memory. We've freed all of the tuples that we previously
2273 * allocated. It's important to avoid fragmentation when there is a stark
2274 * change in the sizes of incoming tuples. In bounded sorts,
2275 * fragmentation due to AllocSetFree's bucketing by size class might be
2276 * particularly bad if this step wasn't taken.
2277 */
2278 MemoryContextReset(state->base.tuplecontext);
2279
2280 /*
2281 * Now update the memory accounting to subtract the memory used by the
2282 * tuple.
2283 */
2284 FREEMEM(state, state->tupleMem);
2285 state->tupleMem = 0;
2286
2287 markrunend(state->destTape);
2288
2289 if (trace_sort)
2290 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2291 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2292 pg_rusage_show(&state->ru_start));
2293}
2294
2295/*
2296 * tuplesort_rescan - rewind and replay the scan
2297 */
2298void
2300{
2301 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2302
2303 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2304
2305 switch (state->status)
2306 {
2307 case TSS_SORTEDINMEM:
2308 state->current = 0;
2309 state->eof_reached = false;
2310 state->markpos_offset = 0;
2311 state->markpos_eof = false;
2312 break;
2313 case TSS_SORTEDONTAPE:
2314 LogicalTapeRewindForRead(state->result_tape, 0);
2315 state->eof_reached = false;
2316 state->markpos_block = 0L;
2317 state->markpos_offset = 0;
2318 state->markpos_eof = false;
2319 break;
2320 default:
2321 elog(ERROR, "invalid tuplesort state");
2322 break;
2323 }
2324
2325 MemoryContextSwitchTo(oldcontext);
2326}
2327
2328/*
2329 * tuplesort_markpos - saves current position in the merged sort file
2330 */
2331void
2333{
2334 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2335
2336 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2337
2338 switch (state->status)
2339 {
2340 case TSS_SORTEDINMEM:
2341 state->markpos_offset = state->current;
2342 state->markpos_eof = state->eof_reached;
2343 break;
2344 case TSS_SORTEDONTAPE:
2345 LogicalTapeTell(state->result_tape,
2346 &state->markpos_block,
2347 &state->markpos_offset);
2348 state->markpos_eof = state->eof_reached;
2349 break;
2350 default:
2351 elog(ERROR, "invalid tuplesort state");
2352 break;
2353 }
2354
2355 MemoryContextSwitchTo(oldcontext);
2356}
2357
2358/*
2359 * tuplesort_restorepos - restores current position in merged sort file to
2360 * last saved position
2361 */
2362void
2364{
2365 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2366
2367 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2368
2369 switch (state->status)
2370 {
2371 case TSS_SORTEDINMEM:
2372 state->current = state->markpos_offset;
2373 state->eof_reached = state->markpos_eof;
2374 break;
2375 case TSS_SORTEDONTAPE:
2376 LogicalTapeSeek(state->result_tape,
2377 state->markpos_block,
2378 state->markpos_offset);
2379 state->eof_reached = state->markpos_eof;
2380 break;
2381 default:
2382 elog(ERROR, "invalid tuplesort state");
2383 break;
2384 }
2385
2386 MemoryContextSwitchTo(oldcontext);
2387}
2388
2389/*
2390 * tuplesort_get_stats - extract summary statistics
2391 *
2392 * This can be called after tuplesort_performsort() finishes to obtain
2393 * printable summary information about how the sort was performed.
2394 */
2395void
2398{
2399 /*
2400 * Note: it might seem we should provide both memory and disk usage for a
2401 * disk-based sort. However, the current code doesn't track memory space
2402 * accurately once we have begun to return tuples to the caller (since we
2403 * don't account for pfree's the caller is expected to do), so we cannot
2404 * rely on availMem in a disk sort. This does not seem worth the overhead
2405 * to fix. Is it worth creating an API for the memory context code to
2406 * tell us how much is actually used in sortcontext?
2407 */
2409
2410 if (state->isMaxSpaceDisk)
2412 else
2414 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2415
2416 switch (state->maxSpaceStatus)
2417 {
2418 case TSS_SORTEDINMEM:
2419 if (state->boundUsed)
2421 else
2423 break;
2424 case TSS_SORTEDONTAPE:
2426 break;
2427 case TSS_FINALMERGE:
2429 break;
2430 default:
2432 break;
2433 }
2434}
2435
2436/*
2437 * Convert TuplesortMethod to a string.
2438 */
2439const char *
2441{
2442 switch (m)
2443 {
2445 return "still in progress";
2447 return "top-N heapsort";
2449 return "quicksort";
2451 return "external sort";
2453 return "external merge";
2454 }
2455
2456 return "unknown";
2457}
2458
2459/*
2460 * Convert TuplesortSpaceType to a string.
2461 */
2462const char *
2464{
2466 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2467}
2468
2469
2470/*
2471 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2472 */
2473
2474/*
2475 * Convert the existing unordered array of SortTuples to a bounded heap,
2476 * discarding all but the smallest "state->bound" tuples.
2477 *
2478 * When working with a bounded heap, we want to keep the largest entry
2479 * at the root (array entry zero), instead of the smallest as in the normal
2480 * sort case. This allows us to discard the largest entry cheaply.
2481 * Therefore, we temporarily reverse the sort direction.
2482 */
2483static void
2485{
2486 int tupcount = state->memtupcount;
2487 int i;
2488
2489 Assert(state->status == TSS_INITIAL);
2490 Assert(state->bounded);
2491 Assert(tupcount >= state->bound);
2493
2494 /* Reverse sort direction so largest entry will be at root */
2496
2497 state->memtupcount = 0; /* make the heap empty */
2498 for (i = 0; i < tupcount; i++)
2499 {
2500 if (state->memtupcount < state->bound)
2501 {
2502 /* Insert next tuple into heap */
2503 /* Must copy source tuple to avoid possible overwrite */
2504 SortTuple stup = state->memtuples[i];
2505
2507 }
2508 else
2509 {
2510 /*
2511 * The heap is full. Replace the largest entry with the new
2512 * tuple, or just discard it, if it's larger than anything already
2513 * in the heap.
2514 */
2515 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2516 {
2517 free_sort_tuple(state, &state->memtuples[i]);
2519 }
2520 else
2521 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2522 }
2523 }
2524
2525 Assert(state->memtupcount == state->bound);
2526 state->status = TSS_BOUNDED;
2527}
2528
2529/*
2530 * Convert the bounded heap to a properly-sorted array
2531 */
2532static void
2534{
2535 int tupcount = state->memtupcount;
2536
2537 Assert(state->status == TSS_BOUNDED);
2538 Assert(state->bounded);
2539 Assert(tupcount == state->bound);
2541
2542 /*
2543 * We can unheapify in place because each delete-top call will remove the
2544 * largest entry, which we can promptly store in the newly freed slot at
2545 * the end. Once we're down to a single-entry heap, we're done.
2546 */
2547 while (state->memtupcount > 1)
2548 {
2549 SortTuple stup = state->memtuples[0];
2550
2551 /* this sifts-up the next-largest entry and decreases memtupcount */
2553 state->memtuples[state->memtupcount] = stup;
2554 }
2555 state->memtupcount = tupcount;
2556
2557 /*
2558 * Reverse sort direction back to the original state. This is not
2559 * actually necessary but seems like a good idea for tidiness.
2560 */
2562
2563 state->status = TSS_SORTEDINMEM;
2564 state->boundUsed = true;
2565}
2566
2567
2568/* radix sort routines */
2569
2570/*
2571 * Retrieve byte from datum, indexed by 'level': 0 for MSB, 7 for LSB
2572 */
2573static inline uint8
2574current_byte(Datum key, int level)
2575{
2576 int shift = (sizeof(Datum) - 1 - level) * BITS_PER_BYTE;
2577
2578 return (key >> shift) & 0xFF;
2579}
2580
2581/*
2582 * Normalize datum such that unsigned comparison is order-preserving,
2583 * taking ASC/DESC into account as well.
2584 */
2585static inline Datum
2587{
2589
2590 if (ssup->comparator == ssup_datum_signed_cmp)
2591 {
2592 norm_datum1 = orig + ((uint64) PG_INT64_MAX) + 1;
2593 }
2594 else if (ssup->comparator == ssup_datum_int32_cmp)
2595 {
2596 /*
2597 * First truncate to uint32. Technically, we don't need to do this,
2598 * but it forces the upper half of the datum to be zero regardless of
2599 * sign.
2600 */
2602
2604 }
2605 else
2606 {
2608 norm_datum1 = orig;
2609 }
2610
2611 if (ssup->ssup_reverse)
2613
2614 return norm_datum1;
2615}
2616
2617/*
2618 * radix_sort_recursive
2619 *
2620 * Radix sort by (pass-by-value) datum1, diverting to qsort_tuple()
2621 * for tiebreaks.
2622 *
2623 * This is a modification of
2624 * ska_byte_sort() from https://github.com/skarupke/ska_sort
2625 * The original copyright notice follows:
2626 *
2627 * Copyright Malte Skarupke 2016.
2628 * Distributed under the Boost Software License, Version 1.0.
2629 *
2630 * Boost Software License - Version 1.0 - August 17th, 2003
2631 *
2632 * Permission is hereby granted, free of charge, to any person or organization
2633 * obtaining a copy of the software and accompanying documentation covered by
2634 * this license (the "Software") to use, reproduce, display, distribute,
2635 * execute, and transmit the Software, and to prepare derivative works of the
2636 * Software, and to permit third-parties to whom the Software is furnished to
2637 * do so, all subject to the following:
2638 *
2639 * The copyright notices in the Software and this entire statement, including
2640 * the above license grant, this restriction and the following disclaimer,
2641 * must be included in all copies of the Software, in whole or in part, and
2642 * all derivative works of the Software, unless such copies or derivative
2643 * works are solely in the form of machine-executable object code generated by
2644 * a source language processor.
2645 *
2646 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2647 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
2648 * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
2649 * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
2650 * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
2651 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2652 * DEALINGS IN THE SOFTWARE.
2653 */
2654static void
2656{
2657 RadixSortInfo partitions[256] = {0};
2659 size_t total = 0;
2660 int num_partitions = 0;
2661 int num_remaining;
2662 SortSupport ssup = &state->base.sortKeys[0];
2665 size_t start_offset = 0;
2666 SortTuple *partition_begin = begin;
2667 int next_level;
2668
2669 /* count number of occurrences of each byte */
2670 ref_datum = normalize_datum(begin[0].datum1, ssup);
2671 for (SortTuple *st = begin; st < begin + n_elems; st++)
2672 {
2675
2676 this_datum = normalize_datum(st->datum1, ssup);
2677 /* accumulate bits different from the reference datum */
2679
2680 /* extract the byte for this level from the normalized datum */
2682
2683 /* save it for the permutation step */
2684 st->curbyte = this_partition;
2685
2686 partitions[this_partition].count++;
2687
2689 }
2690
2691 /* compute partition offsets */
2692 for (int i = 0; i < 256; i++)
2693 {
2694 size_t count = partitions[i].count;
2695
2696 if (count != 0)
2697 {
2698 partitions[i].offset = total;
2699 total += count;
2700 remaining_partitions[num_partitions] = i;
2701 num_partitions++;
2702 }
2703 partitions[i].next_offset = total;
2704 }
2705
2706 /*
2707 * Swap tuples to correct partition.
2708 *
2709 * In traditional American flag sort, a swap sends the current element to
2710 * the correct partition, but the array pointer only advances if the
2711 * partner of the swap happens to be an element that belongs in the
2712 * current partition. That only requires one pass through the array, but
2713 * the disadvantage is we don't know if the pointer can advance until the
2714 * swap completes. Here lies the most interesting innovation from the
2715 * upstream ska_byte_sort: After initiating the swap, we immediately
2716 * proceed to the next element. This makes better use of CPU pipelining,
2717 * but also means that we will often need multiple iterations of this
2718 * loop. ska_byte_sort() maintains a separate list of which partitions
2719 * haven't finished, which is updated every loop iteration. Here we simply
2720 * check each partition during every iteration.
2721 *
2722 * If we started with a single partition, there is nothing to do. If a
2723 * previous loop iteration results in only one partition that hasn't been
2724 * counted as sorted, we know it's actually sorted and can exit the loop.
2725 */
2726 num_remaining = num_partitions;
2727 while (num_remaining > 1)
2728 {
2729 /* start the count over */
2730 num_remaining = num_partitions;
2731
2732 for (int i = 0; i < num_partitions; i++)
2733 {
2735
2736 for (SortTuple *st = begin + partitions[idx].offset;
2737 st < begin + partitions[idx].next_offset;
2738 st++)
2739 {
2740 size_t offset = partitions[st->curbyte].offset++;
2741 SortTuple tmp;
2742
2743 /* swap current tuple with destination position */
2744 Assert(offset < n_elems);
2745 tmp = *st;
2746 *st = begin[offset];
2747 begin[offset] = tmp;
2748
2750 };
2751
2752 /* Is this partition sorted? */
2753 if (partitions[idx].offset == partitions[idx].next_offset)
2754 num_remaining--;
2755 }
2756 }
2757
2758 /* recurse */
2759
2760 if (num_partitions == 1)
2761 {
2762 /*
2763 * There is only one distinct byte at the current level. It can happen
2764 * that some subsequent bytes are also the same for all input values,
2765 * such as the upper bytes of small integers. To skip unproductive
2766 * passes for that case, we compute the level where the input has more
2767 * than one distinct byte, so that the next recursion can start there.
2768 */
2769 if (common_upper_bits == 0)
2770 next_level = sizeof(Datum);
2771 else
2772 {
2773 int diffpos;
2774
2775 /*
2776 * The upper bits of common_upper_bits are zero where all datums
2777 * have the same bits.
2778 */
2780 next_level = sizeof(Datum) - 1 - (diffpos / BITS_PER_BYTE);
2781 }
2782 }
2783 else
2784 next_level = level + 1;
2785
2787 rp < remaining_partitions + num_partitions;
2788 rp++)
2789 {
2790 size_t end_offset = partitions[*rp].next_offset;
2793
2794 if (num_elements > 1)
2795 {
2796 if (next_level < sizeof(Datum))
2797 {
2799 {
2802 state->base.comparetup,
2803 state);
2804 }
2805 else
2806 {
2809 next_level,
2810 state);
2811 }
2812 }
2813 else if (state->base.onlyKey == NULL)
2814 {
2815 /*
2816 * We've finished radix sort on all bytes of the pass-by-value
2817 * datum (possibly abbreviated), now sort using the tiebreak
2818 * comparator.
2819 */
2822 state->base.comparetup_tiebreak,
2823 state);
2824 }
2825 }
2826
2829 }
2830}
2831
2832/*
2833 * Entry point for radix_sort_recursive
2834 *
2835 * Partition tuples by isnull1, then sort both partitions, using
2836 * radix sort on the NOT NULL partition if it's large enough.
2837 */
2838static void
2840{
2841 bool nulls_first = state->base.sortKeys[0].ssup_nulls_first;
2844 size_t d1 = 0,
2845 d2,
2846 null_count,
2848
2849 /*
2850 * Find the first NOT NULL if NULLS FIRST, or first NULL if NULLS LAST.
2851 * This also serves as a quick check for the common case where all tuples
2852 * are NOT NULL in the first sort key.
2853 */
2854 while (d1 < n && data[d1].isnull1 == nulls_first)
2855 {
2856 d1++;
2858 }
2859
2860 /*
2861 * If we have more than one tuple left after the quick check, partition
2862 * the remainder using branchless cyclic permutation, based on
2863 * https://orlp.net/blog/branchless-lomuto-partitioning/
2864 */
2865 Assert(n > 0);
2866 if (d1 < n - 1)
2867 {
2868 size_t i = d1,
2869 j = d1;
2870 SortTuple tmp = data[d1]; /* create gap at front */
2871
2872 while (j < n - 1)
2873 {
2874 /* gap is at j, move i's element to gap */
2875 data[j] = data[i];
2876 /* advance j to the first unknown element */
2877 j += 1;
2878 /* move the first unknown element back to i */
2879 data[i] = data[j];
2880 /* advance i if this element belongs in the left partition */
2881 i += (data[i].isnull1 == nulls_first);
2882
2884 }
2885
2886 /* place gap between left and right partitions */
2887 data[j] = data[i];
2888 /* restore the saved element */
2889 data[i] = tmp;
2890 /* assign it to the correct partition */
2891 i += (data[i].isnull1 == nulls_first);
2892
2893 /* d1 is now the number of elements in the left partition */
2894 d1 = i;
2895 }
2896
2897 d2 = n - d1;
2898
2899 /* set pointers and counts for each partition */
2900 if (nulls_first)
2901 {
2902 null_start = data;
2903 null_count = d1;
2904 not_null_start = data + d1;
2905 not_null_count = d2;
2906 }
2907 else
2908 {
2910 not_null_count = d1;
2911 null_start = data + d1;
2912 null_count = d2;
2913 }
2914
2915 for (SortTuple *st = null_start;
2916 st < null_start + null_count;
2917 st++)
2918 Assert(st->isnull1 == true);
2919 for (SortTuple *st = not_null_start;
2921 st++)
2922 Assert(st->isnull1 == false);
2923
2924 /*
2925 * Sort the NULL partition using tiebreak comparator, if necessary.
2926 */
2927 if (state->base.onlyKey == NULL && null_count > 1)
2928 {
2930 null_count,
2931 state->base.comparetup_tiebreak,
2932 state);
2933 }
2934
2935 /*
2936 * Sort the NOT NULL partition, using radix sort if large enough,
2937 * otherwise fall back to quicksort.
2938 */
2940 {
2943 state->base.comparetup,
2944 state);
2945 }
2946 else
2947 {
2948 bool presorted = true;
2949
2950 for (SortTuple *st = not_null_start + 1;
2952 st++)
2953 {
2954 if (COMPARETUP(state, st - 1, st) > 0)
2955 {
2956 presorted = false;
2957 break;
2958 }
2959
2961 }
2962
2963 if (presorted)
2964 return;
2965 else
2966 {
2969 0,
2970 state);
2971 }
2972 }
2973}
2974
2975/* Verify in-memory sort using standard comparator. */
2976static void
2978{
2979#ifdef USE_ASSERT_CHECKING
2980 for (SortTuple *st = state->memtuples + 1;
2981 st < state->memtuples + state->memtupcount;
2982 st++)
2983 Assert(COMPARETUP(state, st - 1, st) <= 0);
2984#endif
2985}
2986
2987/*
2988 * Sort all memtuples using specialized routines.
2989 *
2990 * Quicksort or radix sort is used for small in-memory sorts,
2991 * and external sort runs.
2992 */
2993static void
2995{
2996 Assert(!LEADER(state));
2997
2998 if (state->memtupcount > 1)
2999 {
3000 /*
3001 * Do we have the leading column's value or abbreviation in datum1?
3002 */
3003 if (state->base.haveDatum1 && state->base.sortKeys)
3004 {
3005 SortSupport ssup = &state->base.sortKeys[0];
3006
3007 /* Does it compare as an integer? */
3008 if (state->memtupcount >= QSORT_THRESHOLD &&
3012 {
3013 radix_sort_tuple(state->memtuples,
3014 state->memtupcount,
3015 state);
3017 return;
3018 }
3019 }
3020
3021 /* Can we use the single-key sort function? */
3022 if (state->base.onlyKey != NULL)
3023 {
3024 qsort_ssup(state->memtuples, state->memtupcount,
3025 state->base.onlyKey);
3026 }
3027 else
3028 {
3029 qsort_tuple(state->memtuples,
3030 state->memtupcount,
3031 state->base.comparetup,
3032 state);
3033 }
3034 }
3035}
3036
3037/*
3038 * Insert a new tuple into an empty or existing heap, maintaining the
3039 * heap invariant. Caller is responsible for ensuring there's room.
3040 *
3041 * Note: For some callers, tuple points to a memtuples[] entry above the
3042 * end of the heap. This is safe as long as it's not immediately adjacent
3043 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3044 * is, it might get overwritten before being moved into the heap!
3045 */
3046static void
3048{
3049 SortTuple *memtuples;
3050 int j;
3051
3052 memtuples = state->memtuples;
3053 Assert(state->memtupcount < state->memtupsize);
3054
3056
3057 /*
3058 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3059 * using 1-based array indexes, not 0-based.
3060 */
3061 j = state->memtupcount++;
3062 while (j > 0)
3063 {
3064 int i = (j - 1) >> 1;
3065
3066 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3067 break;
3068 memtuples[j] = memtuples[i];
3069 j = i;
3070 }
3071 memtuples[j] = *tuple;
3072}
3073
3074/*
3075 * Remove the tuple at state->memtuples[0] from the heap. Decrement
3076 * memtupcount, and sift up to maintain the heap invariant.
3077 *
3078 * The caller has already free'd the tuple the top node points to,
3079 * if necessary.
3080 */
3081static void
3083{
3084 SortTuple *memtuples = state->memtuples;
3085 SortTuple *tuple;
3086
3087 if (--state->memtupcount <= 0)
3088 return;
3089
3090 /*
3091 * Remove the last tuple in the heap, and re-insert it, by replacing the
3092 * current top node with it.
3093 */
3094 tuple = &memtuples[state->memtupcount];
3096}
3097
3098/*
3099 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
3100 * maintain the heap invariant.
3101 *
3102 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3103 * Heapsort, steps H3-H8).
3104 */
3105static void
3107{
3108 SortTuple *memtuples = state->memtuples;
3109 unsigned int i,
3110 n;
3111
3112 Assert(state->memtupcount >= 1);
3113
3115
3116 /*
3117 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3118 * This prevents overflow in the "2 * i + 1" calculation, since at the top
3119 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3120 */
3121 n = state->memtupcount;
3122 i = 0; /* i is where the "hole" is */
3123 for (;;)
3124 {
3125 unsigned int j = 2 * i + 1;
3126
3127 if (j >= n)
3128 break;
3129 if (j + 1 < n &&
3130 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3131 j++;
3132 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3133 break;
3134 memtuples[i] = memtuples[j];
3135 i = j;
3136 }
3137 memtuples[i] = *tuple;
3138}
3139
3140/*
3141 * Function to reverse the sort direction from its current state
3142 *
3143 * It is not safe to call this when performing hash tuplesorts
3144 */
3145static void
3147{
3148 SortSupport sortKey = state->base.sortKeys;
3149 int nkey;
3150
3151 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
3152 {
3153 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3154 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3155 }
3156}
3157
3158
3159/*
3160 * Tape interface routines
3161 */
3162
3163static unsigned int
3165{
3166 unsigned int len;
3167
3169 &len, sizeof(len)) != sizeof(len))
3170 elog(ERROR, "unexpected end of tape");
3171 if (len == 0 && !eofOK)
3172 elog(ERROR, "unexpected end of data");
3173 return len;
3174}
3175
3176static void
3178{
3179 unsigned int len = 0;
3180
3181 LogicalTapeWrite(tape, &len, sizeof(len));
3182}
3183
3184/*
3185 * Get memory for tuple from within READTUP() routine.
3186 *
3187 * We use next free slot from the slab allocator, or palloc() if the tuple
3188 * is too large for that.
3189 */
3190void *
3192{
3193 SlabSlot *buf;
3194
3195 /*
3196 * We pre-allocate enough slots in the slab arena that we should never run
3197 * out.
3198 */
3199 Assert(state->slabFreeHead);
3200
3201 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3202 return MemoryContextAlloc(state->base.sortcontext, tuplen);
3203 else
3204 {
3205 buf = state->slabFreeHead;
3206 /* Reuse this slot */
3207 state->slabFreeHead = buf->nextfree;
3208
3209 return buf;
3210 }
3211}
3212
3213
3214/*
3215 * Parallel sort routines
3216 */
3217
3218/*
3219 * tuplesort_estimate_shared - estimate required shared memory allocation
3220 *
3221 * nWorkers is an estimate of the number of workers (it's the number that
3222 * will be requested).
3223 */
3224Size
3226{
3228
3229 Assert(nWorkers > 0);
3230
3231 /* Make sure that BufFile shared state is MAXALIGN'd */
3234
3235 return tapesSize;
3236}
3237
3238/*
3239 * tuplesort_initialize_shared - initialize shared tuplesort state
3240 *
3241 * Must be called from leader process before workers are launched, to
3242 * establish state needed up-front for worker tuplesortstates. nWorkers
3243 * should match the argument passed to tuplesort_estimate_shared().
3244 */
3245void
3247{
3248 int i;
3249
3250 Assert(nWorkers > 0);
3251
3252 SpinLockInit(&shared->mutex);
3253 shared->currentWorker = 0;
3254 shared->workersFinished = 0;
3255 SharedFileSetInit(&shared->fileset, seg);
3256 shared->nTapes = nWorkers;
3257 for (i = 0; i < nWorkers; i++)
3258 {
3259 shared->tapes[i].firstblocknumber = 0L;
3260 }
3261}
3262
3263/*
3264 * tuplesort_attach_shared - attach to shared tuplesort state
3265 *
3266 * Must be called by all worker processes.
3267 */
3268void
3270{
3271 /* Attach to SharedFileSet */
3272 SharedFileSetAttach(&shared->fileset, seg);
3273}
3274
3275/*
3276 * worker_get_identifier - Assign and return ordinal identifier for worker
3277 *
3278 * The order in which these are assigned is not well defined, and should not
3279 * matter; worker numbers across parallel sort participants need only be
3280 * distinct and gapless. logtape.c requires this.
3281 *
3282 * Note that the identifiers assigned from here have no relation to
3283 * ParallelWorkerNumber number, to avoid making any assumption about
3284 * caller's requirements. However, we do follow the ParallelWorkerNumber
3285 * convention of representing a non-worker with worker number -1. This
3286 * includes the leader, as well as serial Tuplesort processes.
3287 */
3288static int
3290{
3291 Sharedsort *shared = state->shared;
3292 int worker;
3293
3295
3296 SpinLockAcquire(&shared->mutex);
3297 worker = shared->currentWorker++;
3298 SpinLockRelease(&shared->mutex);
3299
3300 return worker;
3301}
3302
3303/*
3304 * worker_freeze_result_tape - freeze worker's result tape for leader
3305 *
3306 * This is called by workers just after the result tape has been determined,
3307 * instead of calling LogicalTapeFreeze() directly. They do so because
3308 * workers require a few additional steps over similar serial
3309 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3310 * steps are around freeing now unneeded resources, and representing to
3311 * leader that worker's input run is available for its merge.
3312 *
3313 * There should only be one final output run for each worker, which consists
3314 * of all tuples that were originally input into worker.
3315 */
3316static void
3318{
3319 Sharedsort *shared = state->shared;
3321
3323 Assert(state->result_tape != NULL);
3324 Assert(state->memtupcount == 0);
3325
3326 /*
3327 * Free most remaining memory, in case caller is sensitive to our holding
3328 * on to it. memtuples may not be a tiny merge heap at this point.
3329 */
3330 pfree(state->memtuples);
3331 /* Be tidy */
3332 state->memtuples = NULL;
3333 state->memtupsize = 0;
3334
3335 /*
3336 * Parallel worker requires result tape metadata, which is to be stored in
3337 * shared memory for leader
3338 */
3339 LogicalTapeFreeze(state->result_tape, &output);
3340
3341 /* Store properties of output tape, and update finished worker count */
3342 SpinLockAcquire(&shared->mutex);
3343 shared->tapes[state->worker] = output;
3344 shared->workersFinished++;
3345 SpinLockRelease(&shared->mutex);
3346}
3347
3348/*
3349 * worker_nomergeruns - dump memtuples in worker, without merging
3350 *
3351 * This called as an alternative to mergeruns() with a worker when no
3352 * merging is required.
3353 */
3354static void
3356{
3358 Assert(state->result_tape == NULL);
3359 Assert(state->nOutputRuns == 1);
3360
3361 state->result_tape = state->destTape;
3363}
3364
3365/*
3366 * leader_takeover_tapes - create tapeset for leader from worker tapes
3367 *
3368 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3369 * sorting has occurred in workers, all of which must have already returned
3370 * from tuplesort_performsort().
3371 *
3372 * When this returns, leader process is left in a state that is virtually
3373 * indistinguishable from it having generated runs as a serial external sort
3374 * might have.
3375 */
3376static void
3378{
3379 Sharedsort *shared = state->shared;
3380 int nParticipants = state->nParticipants;
3381 int workersFinished;
3382 int j;
3383
3385 Assert(nParticipants >= 1);
3386
3387 SpinLockAcquire(&shared->mutex);
3388 workersFinished = shared->workersFinished;
3389 SpinLockRelease(&shared->mutex);
3390
3391 if (nParticipants != workersFinished)
3392 elog(ERROR, "cannot take over tapes before all workers finish");
3393
3394 /*
3395 * Create the tapeset from worker tapes, including a leader-owned tape at
3396 * the end. Parallel workers are far more expensive than logical tapes,
3397 * so the number of tapes allocated here should never be excessive.
3398 */
3399 inittapestate(state, nParticipants);
3400 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3401
3402 /*
3403 * Set currentRun to reflect the number of runs we will merge (it's not
3404 * used for anything, this is just pro forma)
3405 */
3406 state->currentRun = nParticipants;
3407
3408 /*
3409 * Initialize the state to look the same as after building the initial
3410 * runs.
3411 *
3412 * There will always be exactly 1 run per worker, and exactly one input
3413 * tape per run, because workers always output exactly 1 run, even when
3414 * there were no input tuples for workers to sort.
3415 */
3416 state->inputTapes = NULL;
3417 state->nInputTapes = 0;
3418 state->nInputRuns = 0;
3419
3420 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3421 state->nOutputTapes = nParticipants;
3422 state->nOutputRuns = nParticipants;
3423
3424 for (j = 0; j < nParticipants; j++)
3425 {
3426 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3427 }
3428
3429 state->status = TSS_BUILDRUNS;
3430}
3431
3432/*
3433 * Convenience routine to free a tuple previously loaded into sort memory
3434 */
3435static void
3437{
3438 if (stup->tuple)
3439 {
3441 pfree(stup->tuple);
3442 stup->tuple = NULL;
3443 }
3444}
3445
3446int
3448{
3449 if (x < y)
3450 return -1;
3451 else if (x > y)
3452 return 1;
3453 else
3454 return 0;
3455}
3456
3457int
3459{
3462
3463 if (xx < yy)
3464 return -1;
3465 else if (xx > yy)
3466 return 1;
3467 else
3468 return 0;
3469}
3470
3471int
3473{
3476
3477 if (xx < yy)
3478 return -1;
3479 else if (xx > yy)
3480 return 1;
3481 else
3482 return 0;
3483}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
void PrepareTempTablespaces(void)
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition bump.c:133
#define PG_INT32_MAX
Definition c.h:673
#define Min(x, y)
Definition c.h:1091
#define MAXALIGN(LEN)
Definition c.h:896
uint8_t uint8
Definition c.h:622
#define Max(x, y)
Definition c.h:1085
#define INT64_FORMAT
Definition c.h:634
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int32_t int32
Definition c.h:620
#define PG_INT64_MAX
Definition c.h:676
uint64_t uint64
Definition c.h:625
uint32_t uint32
Definition c.h:624
size_t Size
Definition c.h:689
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:32
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define palloc0_object(type)
Definition fe_memutils.h:75
FILE * output
TuplesortSpaceType
@ SORT_SPACE_TYPE_DISK
@ SORT_SPACE_TYPE_MEMORY
TuplesortMethod
@ SORT_TYPE_EXTERNAL_SORT
@ SORT_TYPE_TOP_N_HEAPSORT
@ SORT_TYPE_QUICKSORT
@ SORT_TYPE_STILL_IN_PROGRESS
@ SORT_TYPE_EXTERNAL_MERGE
int y
Definition isn.c:76
int x
Definition isn.c:75
int j
Definition isn.c:78
int i
Definition isn.c:77
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition logtape.c:1133
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition logtape.c:556
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition logtape.c:761
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition logtape.c:680
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition logtape.c:609
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition mcxt.c:1232
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
Size GetMemoryChunkSpace(void *pointer)
Definition mcxt.c:770
void * palloc0(Size size)
Definition mcxt.c:1417
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void * repalloc_huge(void *pointer, Size size)
Definition mcxt.c:1757
void MemoryContextResetOnly(MemoryContext context)
Definition mcxt.c:422
#define AllocSetContextCreate
Definition memutils.h:129
#define MaxAllocHugeSize
Definition memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
static int pg_leftmost_one_pos64(uint64 word)
Definition pg_bitutils.h:72
#define BITS_PER_BYTE
const void size_t len
const void * data
const char * pg_rusage_show(const PGRUsage *ru0)
Definition pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition pg_rusage.c:27
static char buf[DEFAULT_XLOG_SEG_SIZE]
static int partitions
Definition pgbench.c:224
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
static uint64 DatumGetUInt64(Datum X)
Definition postgres.h:423
static int64 DatumGetInt64(Datum X)
Definition postgres.h:403
uint64_t Datum
Definition postgres.h:70
static Datum UInt32GetDatum(uint32 X)
Definition postgres.h:232
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
static int fb(int x)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
size_t next_offset
Definition tuplesort.c:518
size_t offset
Definition tuplesort.c:516
SharedFileSet fileset
Definition tuplesort.c:358
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition tuplesort.c:367
int workersFinished
Definition tuplesort.c:355
slock_t mutex
Definition tuplesort.c:344
int currentWorker
Definition tuplesort.c:354
int(* comparator)(Datum x, Datum y, SortSupport ssup)
int srctape
Definition tuplesort.h:120
Datum datum1
Definition tuplesort.h:117
int64 firstblocknumber
Definition logtape.h:54
TuplesortSpaceType spaceType
void * lastReturnedTuple
Definition tuplesort.c:262
LogicalTapeSet * tapeset
Definition tuplesort.c:206
bool isMaxSpaceDisk
Definition tuplesort.c:203
SortTuple * memtuples
Definition tuplesort.c:215
LogicalTape ** inputTapes
Definition tuplesort.c:278
bool slabAllocatorUsed
Definition tuplesort.c:247
TuplesortPublic base
Definition tuplesort.c:186
char * slabMemoryEnd
Definition tuplesort.c:250
PGRUsage ru_start
Definition tuplesort.c:334
char * slabMemoryBegin
Definition tuplesort.c:249
LogicalTape ** outputTapes
Definition tuplesort.c:282
size_t tape_buffer_mem
Definition tuplesort.c:254
TupSortStatus status
Definition tuplesort.c:187
LogicalTape * destTape
Definition tuplesort.c:286
TupSortStatus maxSpaceStatus
Definition tuplesort.c:205
int64 markpos_block
Definition tuplesort.c:298
Sharedsort * shared
Definition tuplesort.c:319
LogicalTape * result_tape
Definition tuplesort.c:293
SlabSlot * slabFreeHead
Definition tuplesort.c:251
void tuplesort_rescan(Tuplesortstate *state)
Definition tuplesort.c:2299
void tuplesort_performsort(Tuplesortstate *state)
Definition tuplesort.c:1260
int tuplesort_merge_order(int64 allowedMem)
Definition tuplesort.c:1675
#define TAPE_BUFFER_OVERHEAD
Definition tuplesort.c:177
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition tuplesort.c:3082
#define INITIAL_MEMTUPSIZE
Definition tuplesort.c:119
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition tuplesort.c:3164
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition tuplesort.c:3246
#define COMPARETUP(state, a, b)
Definition tuplesort.c:394
static Datum normalize_datum(Datum orig, SortSupport ssup)
Definition tuplesort.c:2586
static void selectnewtape(Tuplesortstate *state)
Definition tuplesort.c:1845
void tuplesort_reset(Tuplesortstate *state)
Definition tuplesort.c:916
#define SERIAL(state)
Definition tuplesort.c:401
#define FREESTATE(state)
Definition tuplesort.c:397
static void markrunend(LogicalTape *tape)
Definition tuplesort.c:3177
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition tuplesort.c:1607
static uint8 current_byte(Datum key, int level)
Definition tuplesort.c:2574
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition tuplesort.c:3436
#define REMOVEABBREV(state, stup, count)
Definition tuplesort.c:393
#define LACKMEM(state)
Definition tuplesort.c:398
static void reversedirection(Tuplesortstate *state)
Definition tuplesort.c:3146
#define USEMEM(state, amt)
Definition tuplesort.c:399
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition tuplesort.c:3047
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3458
static bool grow_memtuples(Tuplesortstate *state)
Definition tuplesort.c:949
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3447
static void beginmerge(Tuplesortstate *state)
Definition tuplesort.c:2157
static void make_bounded_heap(Tuplesortstate *state)
Definition tuplesort.c:2484
bool tuplesort_used_bound(Tuplesortstate *state)
Definition tuplesort.c:783
#define WRITETUP(state, tape, stup)
Definition tuplesort.c:395
#define QSORT_THRESHOLD
Definition tuplesort.c:524
static void sort_bounded_heap(Tuplesortstate *state)
Definition tuplesort.c:2533
TupSortStatus
Definition tuplesort.c:154
@ TSS_SORTEDONTAPE
Definition tuplesort.c:159
@ TSS_SORTEDINMEM
Definition tuplesort.c:158
@ TSS_INITIAL
Definition tuplesort.c:155
@ TSS_FINALMERGE
Definition tuplesort.c:160
@ TSS_BUILDRUNS
Definition tuplesort.c:157
@ TSS_BOUNDED
Definition tuplesort.c:156
static int worker_get_identifier(Tuplesortstate *state)
Definition tuplesort.c:3289
static void mergeonerun(Tuplesortstate *state)
Definition tuplesort.c:2097
#define FREEMEM(state, amt)
Definition tuplesort.c:400
static void radix_sort_recursive(SortTuple *begin, size_t n_elems, int level, Tuplesortstate *state)
Definition tuplesort.c:2655
static void verify_memtuples_sorted(Tuplesortstate *state)
Definition tuplesort.c:2977
#define MAXORDER
Definition tuplesort.c:176
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition tuplesort.c:1811
#define SLAB_SLOT_SIZE
Definition tuplesort.c:141
static void leader_takeover_tapes(Tuplesortstate *state)
Definition tuplesort.c:3377
Size tuplesort_estimate_shared(int nWorkers)
Definition tuplesort.c:3225
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition tuplesort.c:2396
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition tuplesort.c:547
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition tuplesort.c:2994
void tuplesort_end(Tuplesortstate *state)
Definition tuplesort.c:848
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition tuplesort.c:1762
void tuplesort_markpos(Tuplesortstate *state)
Definition tuplesort.c:2332
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition tuplesort.c:1066
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition tuplesort.c:2463
static void radix_sort_tuple(SortTuple *data, size_t n, Tuplesortstate *state)
Definition tuplesort.c:2839
#define MERGE_BUFFER_SIZE
Definition tuplesort.c:178
#define READTUP(state, stup, tape, len)
Definition tuplesort.c:396
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition tuplesort.c:3472
#define LEADER(state)
Definition tuplesort.c:403
#define WORKER(state)
Definition tuplesort.c:402
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition tuplesort.c:1367
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition tuplesort.c:1730
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition tuplesort.c:2185
static void tuplesort_updatemax(Tuplesortstate *state)
Definition tuplesort.c:865
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition tuplesort.c:3317
bool trace_sort
Definition tuplesort.c:123
#define RELEASE_SLAB_SLOT(state, tuple)
Definition tuplesort.c:381
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition tuplesort.c:3269
static void worker_nomergeruns(Tuplesortstate *state)
Definition tuplesort.c:3355
const char * tuplesort_method_name(TuplesortMethod m)
Definition tuplesort.c:2440
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition tuplesort.c:3106
void tuplesort_restorepos(Tuplesortstate *state)
Definition tuplesort.c:2363
static void mergeruns(Tuplesortstate *state)
Definition tuplesort.c:1914
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition tuplesort.c:3191
#define MINORDER
Definition tuplesort.c:175
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition tuplesort.c:653
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition tuplesort.c:735
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition tuplesort.c:1878
static bool consider_abort_common(Tuplesortstate *state)
Definition tuplesort.c:1216
static void tuplesort_free(Tuplesortstate *state)
Definition tuplesort.c:794
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition tuplesort.c:2204
#define TupleSortUseBumpTupleCxt(opt)
Definition tuplesort.h:82
#define TUPLESORT_RANDOMACCESS
Definition tuplesort.h:70
#define TUPLESORT_ALLOWBOUNDED
Definition tuplesort.h:73
char buffer[SLAB_SLOT_SIZE]
Definition tuplesort.c:146
union SlabSlot * nextfree
Definition tuplesort.c:145