PostgreSQL Source Code git master
tuplesort.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * tuplesort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module provides a generalized facility for tuple sorting, which can be
7 * applied to different kinds of sortable objects. Implementation of
8 * the particular sorting variants is given in tuplesortvariants.c.
9 * This module works efficiently for both small and large amounts
10 * of data. Small amounts are sorted in-memory using qsort(). Large
11 * amounts are sorted using temporary files and a standard external sort
12 * algorithm.
13 *
14 * See Knuth, volume 3, for more than you want to know about external
15 * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 * merge is better. Knuth is assuming that tape drives are expensive
19 * beasts, and in particular that there will always be many more runs than
20 * tape drives. The polyphase merge algorithm was good at keeping all the
21 * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 * much more than a few Kb of memory buffers, so we can afford to have
23 * lots of them. In particular, if we can have as many tape drives as
24 * sorted runs, we can eliminate any repeated I/O at all.
25 *
26 * Historically, we divided the input into sorted runs using replacement
27 * selection, in the form of a priority tree implemented as a heap
28 * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 * for run generation.
30 *
31 * The approximate amount of memory allowed for any one sort operation
32 * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 * we absorb tuples and simply store them in an unsorted array as long as
34 * we haven't exceeded workMem. If we reach the end of the input without
35 * exceeding workMem, we sort the array using qsort() and subsequently return
36 * tuples just by scanning the tuple array sequentially. If we do exceed
37 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 * When tuples are dumped in batch after quicksorting, we begin a new run
39 * with a new output tape. If we reach the max number of tapes, we write
40 * subsequent runs on the existing tapes in a round-robin fashion. We will
41 * need multiple merge passes to finish the merge in that case. After the
42 * end of the input is reached, we dump out remaining tuples in memory into
43 * a final run, then merge the runs.
44 *
45 * When merging runs, we use a heap containing just the frontmost tuple from
46 * each source run; we repeatedly output the smallest tuple and replace it
47 * with the next tuple from its source tape (if any). When the heap empties,
48 * the merge is complete. The basic merge algorithm thus needs very little
49 * memory --- only M tuples for an M-way merge, and M is constrained to a
50 * small number. However, we can still make good use of our full workMem
51 * allocation by pre-reading additional blocks from each source tape. Without
52 * prereading, our access pattern to the temporary file would be very erratic;
53 * on average we'd read one block from each of M source tapes during the same
54 * time that we're writing M blocks to the output tape, so there is no
55 * sequentiality of access at all, defeating the read-ahead methods used by
56 * most Unix kernels. Worse, the output tape gets written into a very random
57 * sequence of blocks of the temp file, ensuring that things will be even
58 * worse when it comes time to read that tape. A straightforward merge pass
59 * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 * by prereading from each source tape sequentially, loading about workMem/M
61 * bytes from each tape in turn, and making the sequential blocks immediately
62 * available for reuse. This approach helps to localize both read and write
63 * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 * much memory to use for the buffers.
65 *
66 * In the current code we determine the number of input tapes M on the basis
67 * of workMem: we want workMem/M to be large enough that we read a fair
68 * amount of data each time we read from a tape, so as to maintain the
69 * locality of access described above. Nonetheless, with large workMem we
70 * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 * which avoids space wastage by recycling disk space as soon as each block
72 * is read from its "tape".
73 *
74 * When the caller requests random access to the sort result, we form
75 * the final sorted run on a logical tape which is then "frozen", so
76 * that we can access it randomly. When the caller does not need random
77 * access, we return from tuplesort_performsort() as soon as we are down
78 * to one run per logical tape. The final merge is then performed
79 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 * saves one cycle of writing all the data out to disk and reading it in.
81 *
82 * This module supports parallel sorting. Parallel sorts involve coordination
83 * among one or more worker processes, and a leader process, each with its own
84 * tuplesort state. The leader process (or, more accurately, the
85 * Tuplesortstate associated with a leader process) creates a full tapeset
86 * consisting of worker tapes with one run to merge; a run for every
87 * worker process. This is then merged. Worker processes are guaranteed to
88 * produce exactly one output run from their partial input.
89 *
90 *
91 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
92 * Portions Copyright (c) 1994, Regents of the University of California
93 *
94 * IDENTIFICATION
95 * src/backend/utils/sort/tuplesort.c
96 *
97 *-------------------------------------------------------------------------
98 */
99
100#include "postgres.h"
101
102#include <limits.h>
103
104#include "commands/tablespace.h"
105#include "miscadmin.h"
106#include "pg_trace.h"
107#include "storage/shmem.h"
108#include "utils/guc.h"
109#include "utils/memutils.h"
110#include "utils/pg_rusage.h"
111#include "utils/tuplesort.h"
112
113/*
114 * Initial size of memtuples array. We're trying to select this size so that
115 * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
116 * allocation might possibly be lowered. However, we don't consider array sizes
117 * less than 1024.
118 *
119 */
120#define INITIAL_MEMTUPSIZE Max(1024, \
121 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
122
123/* GUC variables */
124bool trace_sort = false;
125
126#ifdef DEBUG_BOUNDED_SORT
127bool optimize_bounded_sort = true;
128#endif
129
130
131/*
132 * During merge, we use a pre-allocated set of fixed-size slots to hold
133 * tuples. To avoid palloc/pfree overhead.
134 *
135 * Merge doesn't require a lot of memory, so we can afford to waste some,
136 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
137 * palloc() overhead is not significant anymore.
138 *
139 * 'nextfree' is valid when this chunk is in the free list. When in use, the
140 * slot holds a tuple.
141 */
142#define SLAB_SLOT_SIZE 1024
143
144typedef union SlabSlot
145{
149
150/*
151 * Possible states of a Tuplesort object. These denote the states that
152 * persist between calls of Tuplesort routines.
153 */
154typedef enum
155{
156 TSS_INITIAL, /* Loading tuples; still within memory limit */
157 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
158 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
159 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
160 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
161 TSS_FINALMERGE, /* Performing final merge on-the-fly */
163
164/*
165 * Parameters for calculation of number of tapes to use --- see inittapes()
166 * and tuplesort_merge_order().
167 *
168 * In this calculation we assume that each tape will cost us about 1 blocks
169 * worth of buffer space. This ignores the overhead of all the other data
170 * structures needed for each tape, but it's probably close enough.
171 *
172 * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
173 * input tape, for pre-reading (see discussion at top of file). This is *in
174 * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
175 */
176#define MINORDER 6 /* minimum merge order */
177#define MAXORDER 500 /* maximum merge order */
178#define TAPE_BUFFER_OVERHEAD BLCKSZ
179#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
180
181
182/*
183 * Private state of a Tuplesort operation.
184 */
186{
188 TupSortStatus status; /* enumerated value as shown above */
189 bool bounded; /* did caller specify a maximum number of
190 * tuples to return? */
191 bool boundUsed; /* true if we made use of a bounded heap */
192 int bound; /* if bounded, the maximum number of tuples */
193 int64 tupleMem; /* memory consumed by individual tuples.
194 * storing this separately from what we track
195 * in availMem allows us to subtract the
196 * memory consumed by all tuples when dumping
197 * tuples to tape */
198 int64 availMem; /* remaining memory available, in bytes */
199 int64 allowedMem; /* total memory allowed, in bytes */
200 int maxTapes; /* max number of input tapes to merge in each
201 * pass */
202 int64 maxSpace; /* maximum amount of space occupied among sort
203 * of groups, either in-memory or on-disk */
204 bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
205 * space, false when it's value for in-memory
206 * space */
207 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
208 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
209
210 /*
211 * This array holds the tuples now in sort memory. If we are in state
212 * INITIAL, the tuples are in no particular order; if we are in state
213 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
214 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
215 * H. In state SORTEDONTAPE, the array is not used.
216 */
217 SortTuple *memtuples; /* array of SortTuple structs */
218 int memtupcount; /* number of tuples currently present */
219 int memtupsize; /* allocated length of memtuples array */
220 bool growmemtuples; /* memtuples' growth still underway? */
221
222 /*
223 * Memory for tuples is sometimes allocated using a simple slab allocator,
224 * rather than with palloc(). Currently, we switch to slab allocation
225 * when we start merging. Merging only needs to keep a small, fixed
226 * number of tuples in memory at any time, so we can avoid the
227 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
228 * to hold the tuples.
229 *
230 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
231 * slots. The allocation is sized to have one slot per tape, plus one
232 * additional slot. We need that many slots to hold all the tuples kept
233 * in the heap during merge, plus the one we have last returned from the
234 * sort, with tuplesort_gettuple.
235 *
236 * Initially, all the slots are kept in a linked list of free slots. When
237 * a tuple is read from a tape, it is put to the next available slot, if
238 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
239 * instead.
240 *
241 * When we're done processing a tuple, we return the slot back to the free
242 * list, or pfree() if it was palloc'd. We know that a tuple was
243 * allocated from the slab, if its pointer value is between
244 * slabMemoryBegin and -End.
245 *
246 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
247 * tracking memory usage is not used.
248 */
250
251 char *slabMemoryBegin; /* beginning of slab memory arena */
252 char *slabMemoryEnd; /* end of slab memory arena */
253 SlabSlot *slabFreeHead; /* head of free list */
254
255 /* Memory used for input and output tape buffers. */
257
258 /*
259 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
260 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
261 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
262 * recycle the memory on next gettuple call.
263 */
265
266 /*
267 * While building initial runs, this is the current output run number.
268 * Afterwards, it is the number of initial runs we made.
269 */
271
272 /*
273 * Logical tapes, for merging.
274 *
275 * The initial runs are written in the output tapes. In each merge pass,
276 * the output tapes of the previous pass become the input tapes, and new
277 * output tapes are created as needed. When nInputTapes equals
278 * nInputRuns, there is only one merge pass left.
279 */
283
287
288 LogicalTape *destTape; /* current output tape */
289
290 /*
291 * These variables are used after completion of sorting to keep track of
292 * the next tuple to return. (In the tape case, the tape's current read
293 * position is also critical state.)
294 */
295 LogicalTape *result_tape; /* actual tape of finished output */
296 int current; /* array index (only used if SORTEDINMEM) */
297 bool eof_reached; /* reached EOF (needed for cursors) */
298
299 /* markpos_xxx holds marked position for mark and restore */
300 int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
301 int markpos_offset; /* saved "current", or offset in tape block */
302 bool markpos_eof; /* saved "eof_reached" */
303
304 /*
305 * These variables are used during parallel sorting.
306 *
307 * worker is our worker identifier. Follows the general convention that
308 * -1 value relates to a leader tuplesort, and values >= 0 worker
309 * tuplesorts. (-1 can also be a serial tuplesort.)
310 *
311 * shared is mutable shared memory state, which is used to coordinate
312 * parallel sorts.
313 *
314 * nParticipants is the number of worker Tuplesortstates known by the
315 * leader to have actually been launched, which implies that they must
316 * finish a run that the leader needs to merge. Typically includes a
317 * worker state held by the leader process itself. Set in the leader
318 * Tuplesortstate only.
319 */
323
324 /*
325 * Additional state for managing "abbreviated key" sortsupport routines
326 * (which currently may be used by all cases except the hash index case).
327 * Tracks the intervals at which the optimization's effectiveness is
328 * tested.
329 */
330 int64 abbrevNext; /* Tuple # at which to next check
331 * applicability */
332
333 /*
334 * Resource snapshot for time of sort start.
335 */
337};
338
339/*
340 * Private mutable state of tuplesort-parallel-operation. This is allocated
341 * in shared memory.
342 */
344{
345 /* mutex protects all fields prior to tapes */
346 slock_t mutex;
347
348 /*
349 * currentWorker generates ordinal identifier numbers for parallel sort
350 * workers. These start from 0, and are always gapless.
351 *
352 * Workers increment workersFinished to indicate having finished. If this
353 * is equal to state.nParticipants within the leader, leader is ready to
354 * merge worker runs.
355 */
358
359 /* Temporary file space */
361
362 /* Size of tapes flexible array */
364
365 /*
366 * Tapes array used by workers to report back information needed by the
367 * leader to concatenate all worker tapes into one for merging
368 */
370};
371
372/*
373 * Is the given tuple allocated from the slab memory arena?
374 */
375#define IS_SLAB_SLOT(state, tuple) \
376 ((char *) (tuple) >= (state)->slabMemoryBegin && \
377 (char *) (tuple) < (state)->slabMemoryEnd)
378
379/*
380 * Return the given tuple to the slab memory free list, or free it
381 * if it was palloc'd.
382 */
383#define RELEASE_SLAB_SLOT(state, tuple) \
384 do { \
385 SlabSlot *buf = (SlabSlot *) tuple; \
386 \
387 if (IS_SLAB_SLOT((state), buf)) \
388 { \
389 buf->nextfree = (state)->slabFreeHead; \
390 (state)->slabFreeHead = buf; \
391 } else \
392 pfree(buf); \
393 } while(0)
394
395#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
396#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
397#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
398#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
399#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
400#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
401#define USEMEM(state,amt) ((state)->availMem -= (amt))
402#define FREEMEM(state,amt) ((state)->availMem += (amt))
403#define SERIAL(state) ((state)->shared == NULL)
404#define WORKER(state) ((state)->shared && (state)->worker != -1)
405#define LEADER(state) ((state)->shared && (state)->worker == -1)
406
407/*
408 * NOTES about on-tape representation of tuples:
409 *
410 * We require the first "unsigned int" of a stored tuple to be the total size
411 * on-tape of the tuple, including itself (so it is never zero; an all-zero
412 * unsigned int is used to delimit runs). The remainder of the stored tuple
413 * may or may not match the in-memory representation of the tuple ---
414 * any conversion needed is the job of the writetup and readtup routines.
415 *
416 * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
417 * representation of the tuple must be followed by another "unsigned int" that
418 * is a copy of the length --- so the total tape space used is actually
419 * sizeof(unsigned int) more than the stored length value. This allows
420 * read-backwards. When the random access flag was not specified, the
421 * write/read routines may omit the extra length word.
422 *
423 * writetup is expected to write both length words as well as the tuple
424 * data. When readtup is called, the tape is positioned just after the
425 * front length word; readtup must read the tuple data and advance past
426 * the back length word (if present).
427 *
428 * The write/read routines can make use of the tuple description data
429 * stored in the Tuplesortstate record, if needed. They are also expected
430 * to adjust state->availMem by the amount of memory space (not tape space!)
431 * released or consumed. There is no error return from either writetup
432 * or readtup; they should ereport() on failure.
433 *
434 *
435 * NOTES about memory consumption calculations:
436 *
437 * We count space allocated for tuples against the workMem limit, plus
438 * the space used by the variable-size memtuples array. Fixed-size space
439 * is not counted; it's small enough to not be interesting.
440 *
441 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
442 * rather than the originally-requested size. This is important since
443 * palloc can add substantial overhead. It's not a complete answer since
444 * we won't count any wasted space in palloc allocation blocks, but it's
445 * a lot better than what we were doing before 7.3. As of 9.6, a
446 * separate memory context is used for caller passed tuples. Resetting
447 * it at certain key increments significantly ameliorates fragmentation.
448 * readtup routines use the slab allocator (they cannot use
449 * the reset context because it gets deleted at the point that merging
450 * begins).
451 */
452
453
456static void inittapes(Tuplesortstate *state, bool mergeruns);
457static void inittapestate(Tuplesortstate *state, int maxTapes);
459static void init_slab_allocator(Tuplesortstate *state, int numSlots);
460static void mergeruns(Tuplesortstate *state);
461static void mergeonerun(Tuplesortstate *state);
462static void beginmerge(Tuplesortstate *state);
463static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
464static void dumptuples(Tuplesortstate *state, bool alltuples);
472static unsigned int getlen(LogicalTape *tape, bool eofOK);
473static void markrunend(LogicalTape *tape);
478static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
481
482/*
483 * Specialized comparators that we can inline into specialized sorts. The goal
484 * is to try to sort two tuples without having to follow the pointers to the
485 * comparator or the tuple.
486 *
487 * XXX: For now, there is no specialization for cases where datum1 is
488 * authoritative and we don't even need to fall back to a callback at all (that
489 * would be true for types like int4/int8/timestamp/date, but not true for
490 * abbreviations of text or multi-key sorts. There could be! Is it worth it?
491 */
492
493/* Used if first key's comparator is ssup_datum_unsigned_cmp */
496{
497 int compare;
498
499 compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
500 b->datum1, b->isnull1,
501 &state->base.sortKeys[0]);
502 if (compare != 0)
503 return compare;
504
505 /*
506 * No need to waste effort calling the tiebreak function when there are no
507 * other keys to sort on.
508 */
509 if (state->base.onlyKey != NULL)
510 return 0;
511
512 return state->base.comparetup_tiebreak(a, b, state);
513}
514
515#if SIZEOF_DATUM >= 8
516/* Used if first key's comparator is ssup_datum_signed_cmp */
518qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
519{
520 int compare;
521
522 compare = ApplySignedSortComparator(a->datum1, a->isnull1,
523 b->datum1, b->isnull1,
524 &state->base.sortKeys[0]);
525
526 if (compare != 0)
527 return compare;
528
529 /*
530 * No need to waste effort calling the tiebreak function when there are no
531 * other keys to sort on.
532 */
533 if (state->base.onlyKey != NULL)
534 return 0;
535
536 return state->base.comparetup_tiebreak(a, b, state);
537}
538#endif
539
540/* Used if first key's comparator is ssup_datum_int32_cmp */
543{
544 int compare;
545
546 compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
547 b->datum1, b->isnull1,
548 &state->base.sortKeys[0]);
549
550 if (compare != 0)
551 return compare;
552
553 /*
554 * No need to waste effort calling the tiebreak function when there are no
555 * other keys to sort on.
556 */
557 if (state->base.onlyKey != NULL)
558 return 0;
559
560 return state->base.comparetup_tiebreak(a, b, state);
561}
562
563/*
564 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
565 * any variant of SortTuples, using the appropriate comparetup function.
566 * qsort_ssup() is specialized for the case where the comparetup function
567 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
568 * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
569 * common comparison functions on pass-by-value leading datums.
570 */
571
572#define ST_SORT qsort_tuple_unsigned
573#define ST_ELEMENT_TYPE SortTuple
574#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
575#define ST_COMPARE_ARG_TYPE Tuplesortstate
576#define ST_CHECK_FOR_INTERRUPTS
577#define ST_SCOPE static
578#define ST_DEFINE
579#include "lib/sort_template.h"
580
581#if SIZEOF_DATUM >= 8
582#define ST_SORT qsort_tuple_signed
583#define ST_ELEMENT_TYPE SortTuple
584#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
585#define ST_COMPARE_ARG_TYPE Tuplesortstate
586#define ST_CHECK_FOR_INTERRUPTS
587#define ST_SCOPE static
588#define ST_DEFINE
589#include "lib/sort_template.h"
590#endif
591
592#define ST_SORT qsort_tuple_int32
593#define ST_ELEMENT_TYPE SortTuple
594#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
595#define ST_COMPARE_ARG_TYPE Tuplesortstate
596#define ST_CHECK_FOR_INTERRUPTS
597#define ST_SCOPE static
598#define ST_DEFINE
599#include "lib/sort_template.h"
600
601#define ST_SORT qsort_tuple
602#define ST_ELEMENT_TYPE SortTuple
603#define ST_COMPARE_RUNTIME_POINTER
604#define ST_COMPARE_ARG_TYPE Tuplesortstate
605#define ST_CHECK_FOR_INTERRUPTS
606#define ST_SCOPE static
607#define ST_DECLARE
608#define ST_DEFINE
609#include "lib/sort_template.h"
610
611#define ST_SORT qsort_ssup
612#define ST_ELEMENT_TYPE SortTuple
613#define ST_COMPARE(a, b, ssup) \
614 ApplySortComparator((a)->datum1, (a)->isnull1, \
615 (b)->datum1, (b)->isnull1, (ssup))
616#define ST_COMPARE_ARG_TYPE SortSupportData
617#define ST_CHECK_FOR_INTERRUPTS
618#define ST_SCOPE static
619#define ST_DEFINE
620#include "lib/sort_template.h"
621
622/*
623 * tuplesort_begin_xxx
624 *
625 * Initialize for a tuple sort operation.
626 *
627 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
628 * zero or more times, then call tuplesort_performsort when all the tuples
629 * have been supplied. After performsort, retrieve the tuples in sorted
630 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
631 * access was requested, rescan, markpos, and restorepos can also be called.)
632 * Call tuplesort_end to terminate the operation and release memory/disk space.
633 *
634 * Each variant of tuplesort_begin has a workMem parameter specifying the
635 * maximum number of kilobytes of RAM to use before spilling data to disk.
636 * (The normal value of this parameter is work_mem, but some callers use
637 * other values.) Each variant also has a sortopt which is a bitmask of
638 * sort options. See TUPLESORT_* definitions in tuplesort.h
639 */
640
642tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
643{
645 MemoryContext maincontext;
646 MemoryContext sortcontext;
647 MemoryContext oldcontext;
648
649 /* See leader_takeover_tapes() remarks on random access support */
650 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
651 elog(ERROR, "random access disallowed under parallel sort");
652
653 /*
654 * Memory context surviving tuplesort_reset. This memory context holds
655 * data which is useful to keep while sorting multiple similar batches.
656 */
658 "TupleSort main",
660
661 /*
662 * Create a working memory context for one sort operation. The content of
663 * this context is deleted by tuplesort_reset.
664 */
665 sortcontext = AllocSetContextCreate(maincontext,
666 "TupleSort sort",
668
669 /*
670 * Additionally a working memory context for tuples is setup in
671 * tuplesort_begin_batch.
672 */
673
674 /*
675 * Make the Tuplesortstate within the per-sortstate context. This way, we
676 * don't need a separate pfree() operation for it at shutdown.
677 */
678 oldcontext = MemoryContextSwitchTo(maincontext);
679
681
682 if (trace_sort)
683 pg_rusage_init(&state->ru_start);
684
685 state->base.sortopt = sortopt;
686 state->base.tuples = true;
687 state->abbrevNext = 10;
688
689 /*
690 * workMem is forced to be at least 64KB, the current minimum valid value
691 * for the work_mem GUC. This is a defense against parallel sort callers
692 * that divide out memory among many workers in a way that leaves each
693 * with very little memory.
694 */
695 state->allowedMem = Max(workMem, 64) * (int64) 1024;
696 state->base.sortcontext = sortcontext;
697 state->base.maincontext = maincontext;
698
699 /*
700 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
701 * see comments in grow_memtuples().
702 */
703 state->memtupsize = INITIAL_MEMTUPSIZE;
704 state->memtuples = NULL;
705
706 /*
707 * After all of the other non-parallel-related state, we setup all of the
708 * state needed for each batch.
709 */
711
712 /*
713 * Initialize parallel-related state based on coordination information
714 * from caller
715 */
716 if (!coordinate)
717 {
718 /* Serial sort */
719 state->shared = NULL;
720 state->worker = -1;
721 state->nParticipants = -1;
722 }
723 else if (coordinate->isWorker)
724 {
725 /* Parallel worker produces exactly one final run from all input */
726 state->shared = coordinate->sharedsort;
728 state->nParticipants = -1;
729 }
730 else
731 {
732 /* Parallel leader state only used for final merge */
733 state->shared = coordinate->sharedsort;
734 state->worker = -1;
735 state->nParticipants = coordinate->nParticipants;
736 Assert(state->nParticipants >= 1);
737 }
738
739 MemoryContextSwitchTo(oldcontext);
740
741 return state;
742}
743
744/*
745 * tuplesort_begin_batch
746 *
747 * Setup, or reset, all state need for processing a new set of tuples with this
748 * sort state. Called both from tuplesort_begin_common (the first time sorting
749 * with this sort state) and tuplesort_reset (for subsequent usages).
750 */
751static void
753{
754 MemoryContext oldcontext;
755
756 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
757
758 /*
759 * Caller tuple (e.g. IndexTuple) memory context.
760 *
761 * A dedicated child context used exclusively for caller passed tuples
762 * eases memory management. Resetting at key points reduces
763 * fragmentation. Note that the memtuples array of SortTuples is allocated
764 * in the parent context, not this context, because there is no need to
765 * free memtuples early. For bounded sorts, tuples may be pfreed in any
766 * order, so we use a regular aset.c context so that it can make use of
767 * free'd memory. When the sort is not bounded, we make use of a bump.c
768 * context as this keeps allocations more compact with less wastage.
769 * Allocations are also slightly more CPU efficient.
770 */
771 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
772 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
773 "Caller tuples",
775 else
776 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
777 "Caller tuples",
779
780
781 state->status = TSS_INITIAL;
782 state->bounded = false;
783 state->boundUsed = false;
784
785 state->availMem = state->allowedMem;
786
787 state->tapeset = NULL;
788
789 state->memtupcount = 0;
790
791 /*
792 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
793 * see comments in grow_memtuples().
794 */
795 state->growmemtuples = true;
796 state->slabAllocatorUsed = false;
797 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
798 {
799 pfree(state->memtuples);
800 state->memtuples = NULL;
801 state->memtupsize = INITIAL_MEMTUPSIZE;
802 }
803 if (state->memtuples == NULL)
804 {
805 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
807 }
808
809 /* workMem must be large enough for the minimal memtuples array */
810 if (LACKMEM(state))
811 elog(ERROR, "insufficient memory allowed for sort");
812
813 state->currentRun = 0;
814
815 /*
816 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
817 * inittapes(), if needed.
818 */
819
820 state->result_tape = NULL; /* flag that result tape has not been formed */
821
822 MemoryContextSwitchTo(oldcontext);
823}
824
825/*
826 * tuplesort_set_bound
827 *
828 * Advise tuplesort that at most the first N result tuples are required.
829 *
830 * Must be called before inserting any tuples. (Actually, we could allow it
831 * as long as the sort hasn't spilled to disk, but there seems no need for
832 * delayed calls at the moment.)
833 *
834 * This is a hint only. The tuplesort may still return more tuples than
835 * requested. Parallel leader tuplesorts will always ignore the hint.
836 */
837void
839{
840 /* Assert we're called before loading any tuples */
841 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
842 /* Assert we allow bounded sorts */
843 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
844 /* Can't set the bound twice, either */
845 Assert(!state->bounded);
846 /* Also, this shouldn't be called in a parallel worker */
848
849 /* Parallel leader allows but ignores hint */
850 if (LEADER(state))
851 return;
852
853#ifdef DEBUG_BOUNDED_SORT
854 /* Honor GUC setting that disables the feature (for easy testing) */
855 if (!optimize_bounded_sort)
856 return;
857#endif
858
859 /* We want to be able to compute bound * 2, so limit the setting */
860 if (bound > (int64) (INT_MAX / 2))
861 return;
862
863 state->bounded = true;
864 state->bound = (int) bound;
865
866 /*
867 * Bounded sorts are not an effective target for abbreviated key
868 * optimization. Disable by setting state to be consistent with no
869 * abbreviation support.
870 */
871 state->base.sortKeys->abbrev_converter = NULL;
872 if (state->base.sortKeys->abbrev_full_comparator)
873 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
874
875 /* Not strictly necessary, but be tidy */
876 state->base.sortKeys->abbrev_abort = NULL;
877 state->base.sortKeys->abbrev_full_comparator = NULL;
878}
879
880/*
881 * tuplesort_used_bound
882 *
883 * Allow callers to find out if the sort state was able to use a bound.
884 */
885bool
887{
888 return state->boundUsed;
889}
890
891/*
892 * tuplesort_free
893 *
894 * Internal routine for freeing resources of tuplesort.
895 */
896static void
898{
899 /* context swap probably not needed, but let's be safe */
900 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
901 int64 spaceUsed;
902
903 if (state->tapeset)
904 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
905 else
906 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
907
908 /*
909 * Delete temporary "tape" files, if any.
910 *
911 * We don't bother to destroy the individual tapes here. They will go away
912 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
913 * finished tapes already.)
914 */
915 if (state->tapeset)
916 LogicalTapeSetClose(state->tapeset);
917
918 if (trace_sort)
919 {
920 if (state->tapeset)
921 elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
922 SERIAL(state) ? "external sort" : "parallel external sort",
923 state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
924 else
925 elog(LOG, "%s of worker %d ended, %lld KB used: %s",
926 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
927 state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
928 }
929
930 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
931
933 MemoryContextSwitchTo(oldcontext);
934
935 /*
936 * Free the per-sort memory context, thereby releasing all working memory.
937 */
938 MemoryContextReset(state->base.sortcontext);
939}
940
941/*
942 * tuplesort_end
943 *
944 * Release resources and clean up.
945 *
946 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
947 * pointing to garbage. Be careful not to attempt to use or free such
948 * pointers afterwards!
949 */
950void
952{
954
955 /*
956 * Free the main memory context, including the Tuplesortstate struct
957 * itself.
958 */
959 MemoryContextDelete(state->base.maincontext);
960}
961
962/*
963 * tuplesort_updatemax
964 *
965 * Update maximum resource usage statistics.
966 */
967static void
969{
970 int64 spaceUsed;
971 bool isSpaceDisk;
972
973 /*
974 * Note: it might seem we should provide both memory and disk usage for a
975 * disk-based sort. However, the current code doesn't track memory space
976 * accurately once we have begun to return tuples to the caller (since we
977 * don't account for pfree's the caller is expected to do), so we cannot
978 * rely on availMem in a disk sort. This does not seem worth the overhead
979 * to fix. Is it worth creating an API for the memory context code to
980 * tell us how much is actually used in sortcontext?
981 */
982 if (state->tapeset)
983 {
984 isSpaceDisk = true;
985 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
986 }
987 else
988 {
989 isSpaceDisk = false;
990 spaceUsed = state->allowedMem - state->availMem;
991 }
992
993 /*
994 * Sort evicts data to the disk when it wasn't able to fit that data into
995 * main memory. This is why we assume space used on the disk to be more
996 * important for tracking resource usage than space used in memory. Note
997 * that the amount of space occupied by some tupleset on the disk might be
998 * less than amount of space occupied by the same tupleset in memory due
999 * to more compact representation.
1000 */
1001 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1002 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1003 {
1004 state->maxSpace = spaceUsed;
1005 state->isMaxSpaceDisk = isSpaceDisk;
1006 state->maxSpaceStatus = state->status;
1007 }
1008}
1009
1010/*
1011 * tuplesort_reset
1012 *
1013 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1014 * meta-information in. After tuplesort_reset, tuplesort is ready to start
1015 * a new sort. This allows avoiding recreation of tuple sort states (and
1016 * save resources) when sorting multiple small batches.
1017 */
1018void
1020{
1023
1024 /*
1025 * After we've freed up per-batch memory, re-setup all of the state common
1026 * to both the first batch and any subsequent batch.
1027 */
1029
1030 state->lastReturnedTuple = NULL;
1031 state->slabMemoryBegin = NULL;
1032 state->slabMemoryEnd = NULL;
1033 state->slabFreeHead = NULL;
1034}
1035
1036/*
1037 * Grow the memtuples[] array, if possible within our memory constraint. We
1038 * must not exceed INT_MAX tuples in memory or the caller-provided memory
1039 * limit. Return true if we were able to enlarge the array, false if not.
1040 *
1041 * Normally, at each increment we double the size of the array. When doing
1042 * that would exceed a limit, we attempt one last, smaller increase (and then
1043 * clear the growmemtuples flag so we don't try any more). That allows us to
1044 * use memory as fully as permitted; sticking to the pure doubling rule could
1045 * result in almost half going unused. Because availMem moves around with
1046 * tuple addition/removal, we need some rule to prevent making repeated small
1047 * increases in memtupsize, which would just be useless thrashing. The
1048 * growmemtuples flag accomplishes that and also prevents useless
1049 * recalculations in this function.
1050 */
1051static bool
1053{
1054 int newmemtupsize;
1055 int memtupsize = state->memtupsize;
1056 int64 memNowUsed = state->allowedMem - state->availMem;
1057
1058 /* Forget it if we've already maxed out memtuples, per comment above */
1059 if (!state->growmemtuples)
1060 return false;
1061
1062 /* Select new value of memtupsize */
1063 if (memNowUsed <= state->availMem)
1064 {
1065 /*
1066 * We've used no more than half of allowedMem; double our usage,
1067 * clamping at INT_MAX tuples.
1068 */
1069 if (memtupsize < INT_MAX / 2)
1070 newmemtupsize = memtupsize * 2;
1071 else
1072 {
1073 newmemtupsize = INT_MAX;
1074 state->growmemtuples = false;
1075 }
1076 }
1077 else
1078 {
1079 /*
1080 * This will be the last increment of memtupsize. Abandon doubling
1081 * strategy and instead increase as much as we safely can.
1082 *
1083 * To stay within allowedMem, we can't increase memtupsize by more
1084 * than availMem / sizeof(SortTuple) elements. In practice, we want
1085 * to increase it by considerably less, because we need to leave some
1086 * space for the tuples to which the new array slots will refer. We
1087 * assume the new tuples will be about the same size as the tuples
1088 * we've already seen, and thus we can extrapolate from the space
1089 * consumption so far to estimate an appropriate new size for the
1090 * memtuples array. The optimal value might be higher or lower than
1091 * this estimate, but it's hard to know that in advance. We again
1092 * clamp at INT_MAX tuples.
1093 *
1094 * This calculation is safe against enlarging the array so much that
1095 * LACKMEM becomes true, because the memory currently used includes
1096 * the present array; thus, there would be enough allowedMem for the
1097 * new array elements even if no other memory were currently used.
1098 *
1099 * We do the arithmetic in float8, because otherwise the product of
1100 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1101 * result should be insignificant; but even if we computed a
1102 * completely insane result, the checks below will prevent anything
1103 * really bad from happening.
1104 */
1105 double grow_ratio;
1106
1107 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1108 if (memtupsize * grow_ratio < INT_MAX)
1109 newmemtupsize = (int) (memtupsize * grow_ratio);
1110 else
1111 newmemtupsize = INT_MAX;
1112
1113 /* We won't make any further enlargement attempts */
1114 state->growmemtuples = false;
1115 }
1116
1117 /* Must enlarge array by at least one element, else report failure */
1118 if (newmemtupsize <= memtupsize)
1119 goto noalloc;
1120
1121 /*
1122 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1123 * to ensure our request won't be rejected. Note that we can easily
1124 * exhaust address space before facing this outcome. (This is presently
1125 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1126 * don't rely on that at this distance.)
1127 */
1128 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1129 {
1130 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1131 state->growmemtuples = false; /* can't grow any more */
1132 }
1133
1134 /*
1135 * We need to be sure that we do not cause LACKMEM to become true, else
1136 * the space management algorithm will go nuts. The code above should
1137 * never generate a dangerous request, but to be safe, check explicitly
1138 * that the array growth fits within availMem. (We could still cause
1139 * LACKMEM if the memory chunk overhead associated with the memtuples
1140 * array were to increase. That shouldn't happen because we chose the
1141 * initial array size large enough to ensure that palloc will be treating
1142 * both old and new arrays as separate chunks. But we'll check LACKMEM
1143 * explicitly below just in case.)
1144 */
1145 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1146 goto noalloc;
1147
1148 /* OK, do it */
1149 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1150 state->memtupsize = newmemtupsize;
1151 state->memtuples = (SortTuple *)
1152 repalloc_huge(state->memtuples,
1153 state->memtupsize * sizeof(SortTuple));
1154 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1155 if (LACKMEM(state))
1156 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1157 return true;
1158
1159noalloc:
1160 /* If for any reason we didn't realloc, shut off future attempts */
1161 state->growmemtuples = false;
1162 return false;
1163}
1164
1165/*
1166 * Shared code for tuple and datum cases.
1167 */
1168void
1170 bool useAbbrev, Size tuplen)
1171{
1172 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1173
1174 Assert(!LEADER(state));
1175
1176 /* account for the memory used for this tuple */
1177 USEMEM(state, tuplen);
1178 state->tupleMem += tuplen;
1179
1180 if (!useAbbrev)
1181 {
1182 /*
1183 * Leave ordinary Datum representation, or NULL value. If there is a
1184 * converter it won't expect NULL values, and cost model is not
1185 * required to account for NULL, so in that case we avoid calling
1186 * converter and just set datum1 to zeroed representation (to be
1187 * consistent, and to support cheap inequality tests for NULL
1188 * abbreviated keys).
1189 */
1190 }
1191 else if (!consider_abort_common(state))
1192 {
1193 /* Store abbreviated key representation */
1194 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1195 state->base.sortKeys);
1196 }
1197 else
1198 {
1199 /*
1200 * Set state to be consistent with never trying abbreviation.
1201 *
1202 * Alter datum1 representation in already-copied tuples, so as to
1203 * ensure a consistent representation (current tuple was just
1204 * handled). It does not matter if some dumped tuples are already
1205 * sorted on tape, since serialized tuples lack abbreviated keys
1206 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1207 */
1208 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1209 }
1210
1211 switch (state->status)
1212 {
1213 case TSS_INITIAL:
1214
1215 /*
1216 * Save the tuple into the unsorted array. First, grow the array
1217 * as needed. Note that we try to grow the array when there is
1218 * still one free slot remaining --- if we fail, there'll still be
1219 * room to store the incoming tuple, and then we'll switch to
1220 * tape-based operation.
1221 */
1222 if (state->memtupcount >= state->memtupsize - 1)
1223 {
1224 (void) grow_memtuples(state);
1225 Assert(state->memtupcount < state->memtupsize);
1226 }
1227 state->memtuples[state->memtupcount++] = *tuple;
1228
1229 /*
1230 * Check if it's time to switch over to a bounded heapsort. We do
1231 * so if the input tuple count exceeds twice the desired tuple
1232 * count (this is a heuristic for where heapsort becomes cheaper
1233 * than a quicksort), or if we've just filled workMem and have
1234 * enough tuples to meet the bound.
1235 *
1236 * Note that once we enter TSS_BOUNDED state we will always try to
1237 * complete the sort that way. In the worst case, if later input
1238 * tuples are larger than earlier ones, this might cause us to
1239 * exceed workMem significantly.
1240 */
1241 if (state->bounded &&
1242 (state->memtupcount > state->bound * 2 ||
1243 (state->memtupcount > state->bound && LACKMEM(state))))
1244 {
1245 if (trace_sort)
1246 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1247 state->memtupcount,
1248 pg_rusage_show(&state->ru_start));
1250 MemoryContextSwitchTo(oldcontext);
1251 return;
1252 }
1253
1254 /*
1255 * Done if we still fit in available memory and have array slots.
1256 */
1257 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1258 {
1259 MemoryContextSwitchTo(oldcontext);
1260 return;
1261 }
1262
1263 /*
1264 * Nope; time to switch to tape-based operation.
1265 */
1266 inittapes(state, true);
1267
1268 /*
1269 * Dump all tuples.
1270 */
1271 dumptuples(state, false);
1272 break;
1273
1274 case TSS_BOUNDED:
1275
1276 /*
1277 * We don't want to grow the array here, so check whether the new
1278 * tuple can be discarded before putting it in. This should be a
1279 * good speed optimization, too, since when there are many more
1280 * input tuples than the bound, most input tuples can be discarded
1281 * with just this one comparison. Note that because we currently
1282 * have the sort direction reversed, we must check for <= not >=.
1283 */
1284 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1285 {
1286 /* new tuple <= top of the heap, so we can discard it */
1287 free_sort_tuple(state, tuple);
1289 }
1290 else
1291 {
1292 /* discard top of heap, replacing it with the new tuple */
1293 free_sort_tuple(state, &state->memtuples[0]);
1295 }
1296 break;
1297
1298 case TSS_BUILDRUNS:
1299
1300 /*
1301 * Save the tuple into the unsorted array (there must be space)
1302 */
1303 state->memtuples[state->memtupcount++] = *tuple;
1304
1305 /*
1306 * If we are over the memory limit, dump all tuples.
1307 */
1308 dumptuples(state, false);
1309 break;
1310
1311 default:
1312 elog(ERROR, "invalid tuplesort state");
1313 break;
1314 }
1315 MemoryContextSwitchTo(oldcontext);
1316}
1317
1318static bool
1320{
1321 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1322 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1323 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1324
1325 /*
1326 * Check effectiveness of abbreviation optimization. Consider aborting
1327 * when still within memory limit.
1328 */
1329 if (state->status == TSS_INITIAL &&
1330 state->memtupcount >= state->abbrevNext)
1331 {
1332 state->abbrevNext *= 2;
1333
1334 /*
1335 * Check opclass-supplied abbreviation abort routine. It may indicate
1336 * that abbreviation should not proceed.
1337 */
1338 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1339 state->base.sortKeys))
1340 return false;
1341
1342 /*
1343 * Finally, restore authoritative comparator, and indicate that
1344 * abbreviation is not in play by setting abbrev_converter to NULL
1345 */
1346 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1347 state->base.sortKeys[0].abbrev_converter = NULL;
1348 /* Not strictly necessary, but be tidy */
1349 state->base.sortKeys[0].abbrev_abort = NULL;
1350 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1351
1352 /* Give up - expect original pass-by-value representation */
1353 return true;
1354 }
1355
1356 return false;
1357}
1358
1359/*
1360 * All tuples have been provided; finish the sort.
1361 */
1362void
1364{
1365 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1366
1367 if (trace_sort)
1368 elog(LOG, "performsort of worker %d starting: %s",
1369 state->worker, pg_rusage_show(&state->ru_start));
1370
1371 switch (state->status)
1372 {
1373 case TSS_INITIAL:
1374
1375 /*
1376 * We were able to accumulate all the tuples within the allowed
1377 * amount of memory, or leader to take over worker tapes
1378 */
1379 if (SERIAL(state))
1380 {
1381 /* Just qsort 'em and we're done */
1383 state->status = TSS_SORTEDINMEM;
1384 }
1385 else if (WORKER(state))
1386 {
1387 /*
1388 * Parallel workers must still dump out tuples to tape. No
1389 * merge is required to produce single output run, though.
1390 */
1391 inittapes(state, false);
1392 dumptuples(state, true);
1394 state->status = TSS_SORTEDONTAPE;
1395 }
1396 else
1397 {
1398 /*
1399 * Leader will take over worker tapes and merge worker runs.
1400 * Note that mergeruns sets the correct state->status.
1401 */
1404 }
1405 state->current = 0;
1406 state->eof_reached = false;
1407 state->markpos_block = 0L;
1408 state->markpos_offset = 0;
1409 state->markpos_eof = false;
1410 break;
1411
1412 case TSS_BOUNDED:
1413
1414 /*
1415 * We were able to accumulate all the tuples required for output
1416 * in memory, using a heap to eliminate excess tuples. Now we
1417 * have to transform the heap to a properly-sorted array. Note
1418 * that sort_bounded_heap sets the correct state->status.
1419 */
1421 state->current = 0;
1422 state->eof_reached = false;
1423 state->markpos_offset = 0;
1424 state->markpos_eof = false;
1425 break;
1426
1427 case TSS_BUILDRUNS:
1428
1429 /*
1430 * Finish tape-based sort. First, flush all tuples remaining in
1431 * memory out to tape; then merge until we have a single remaining
1432 * run (or, if !randomAccess and !WORKER(), one run per tape).
1433 * Note that mergeruns sets the correct state->status.
1434 */
1435 dumptuples(state, true);
1437 state->eof_reached = false;
1438 state->markpos_block = 0L;
1439 state->markpos_offset = 0;
1440 state->markpos_eof = false;
1441 break;
1442
1443 default:
1444 elog(ERROR, "invalid tuplesort state");
1445 break;
1446 }
1447
1448 if (trace_sort)
1449 {
1450 if (state->status == TSS_FINALMERGE)
1451 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1452 state->worker, state->nInputTapes,
1453 pg_rusage_show(&state->ru_start));
1454 else
1455 elog(LOG, "performsort of worker %d done: %s",
1456 state->worker, pg_rusage_show(&state->ru_start));
1457 }
1458
1459 MemoryContextSwitchTo(oldcontext);
1460}
1461
1462/*
1463 * Internal routine to fetch the next tuple in either forward or back
1464 * direction into *stup. Returns false if no more tuples.
1465 * Returned tuple belongs to tuplesort memory context, and must not be freed
1466 * by caller. Note that fetched tuple is stored in memory that may be
1467 * recycled by any future fetch.
1468 */
1469bool
1471 SortTuple *stup)
1472{
1473 unsigned int tuplen;
1474 size_t nmoved;
1475
1476 Assert(!WORKER(state));
1477
1478 switch (state->status)
1479 {
1480 case TSS_SORTEDINMEM:
1481 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1482 Assert(!state->slabAllocatorUsed);
1483 if (forward)
1484 {
1485 if (state->current < state->memtupcount)
1486 {
1487 *stup = state->memtuples[state->current++];
1488 return true;
1489 }
1490 state->eof_reached = true;
1491
1492 /*
1493 * Complain if caller tries to retrieve more tuples than
1494 * originally asked for in a bounded sort. This is because
1495 * returning EOF here might be the wrong thing.
1496 */
1497 if (state->bounded && state->current >= state->bound)
1498 elog(ERROR, "retrieved too many tuples in a bounded sort");
1499
1500 return false;
1501 }
1502 else
1503 {
1504 if (state->current <= 0)
1505 return false;
1506
1507 /*
1508 * if all tuples are fetched already then we return last
1509 * tuple, else - tuple before last returned.
1510 */
1511 if (state->eof_reached)
1512 state->eof_reached = false;
1513 else
1514 {
1515 state->current--; /* last returned tuple */
1516 if (state->current <= 0)
1517 return false;
1518 }
1519 *stup = state->memtuples[state->current - 1];
1520 return true;
1521 }
1522 break;
1523
1524 case TSS_SORTEDONTAPE:
1525 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1526 Assert(state->slabAllocatorUsed);
1527
1528 /*
1529 * The slot that held the tuple that we returned in previous
1530 * gettuple call can now be reused.
1531 */
1532 if (state->lastReturnedTuple)
1533 {
1534 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1535 state->lastReturnedTuple = NULL;
1536 }
1537
1538 if (forward)
1539 {
1540 if (state->eof_reached)
1541 return false;
1542
1543 if ((tuplen = getlen(state->result_tape, true)) != 0)
1544 {
1545 READTUP(state, stup, state->result_tape, tuplen);
1546
1547 /*
1548 * Remember the tuple we return, so that we can recycle
1549 * its memory on next call. (This can be NULL, in the
1550 * !state->tuples case).
1551 */
1552 state->lastReturnedTuple = stup->tuple;
1553
1554 return true;
1555 }
1556 else
1557 {
1558 state->eof_reached = true;
1559 return false;
1560 }
1561 }
1562
1563 /*
1564 * Backward.
1565 *
1566 * if all tuples are fetched already then we return last tuple,
1567 * else - tuple before last returned.
1568 */
1569 if (state->eof_reached)
1570 {
1571 /*
1572 * Seek position is pointing just past the zero tuplen at the
1573 * end of file; back up to fetch last tuple's ending length
1574 * word. If seek fails we must have a completely empty file.
1575 */
1576 nmoved = LogicalTapeBackspace(state->result_tape,
1577 2 * sizeof(unsigned int));
1578 if (nmoved == 0)
1579 return false;
1580 else if (nmoved != 2 * sizeof(unsigned int))
1581 elog(ERROR, "unexpected tape position");
1582 state->eof_reached = false;
1583 }
1584 else
1585 {
1586 /*
1587 * Back up and fetch previously-returned tuple's ending length
1588 * word. If seek fails, assume we are at start of file.
1589 */
1590 nmoved = LogicalTapeBackspace(state->result_tape,
1591 sizeof(unsigned int));
1592 if (nmoved == 0)
1593 return false;
1594 else if (nmoved != sizeof(unsigned int))
1595 elog(ERROR, "unexpected tape position");
1596 tuplen = getlen(state->result_tape, false);
1597
1598 /*
1599 * Back up to get ending length word of tuple before it.
1600 */
1601 nmoved = LogicalTapeBackspace(state->result_tape,
1602 tuplen + 2 * sizeof(unsigned int));
1603 if (nmoved == tuplen + sizeof(unsigned int))
1604 {
1605 /*
1606 * We backed up over the previous tuple, but there was no
1607 * ending length word before it. That means that the prev
1608 * tuple is the first tuple in the file. It is now the
1609 * next to read in forward direction (not obviously right,
1610 * but that is what in-memory case does).
1611 */
1612 return false;
1613 }
1614 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1615 elog(ERROR, "bogus tuple length in backward scan");
1616 }
1617
1618 tuplen = getlen(state->result_tape, false);
1619
1620 /*
1621 * Now we have the length of the prior tuple, back up and read it.
1622 * Note: READTUP expects we are positioned after the initial
1623 * length word of the tuple, so back up to that point.
1624 */
1625 nmoved = LogicalTapeBackspace(state->result_tape,
1626 tuplen);
1627 if (nmoved != tuplen)
1628 elog(ERROR, "bogus tuple length in backward scan");
1629 READTUP(state, stup, state->result_tape, tuplen);
1630
1631 /*
1632 * Remember the tuple we return, so that we can recycle its memory
1633 * on next call. (This can be NULL, in the Datum case).
1634 */
1635 state->lastReturnedTuple = stup->tuple;
1636
1637 return true;
1638
1639 case TSS_FINALMERGE:
1640 Assert(forward);
1641 /* We are managing memory ourselves, with the slab allocator. */
1642 Assert(state->slabAllocatorUsed);
1643
1644 /*
1645 * The slab slot holding the tuple that we returned in previous
1646 * gettuple call can now be reused.
1647 */
1648 if (state->lastReturnedTuple)
1649 {
1650 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1651 state->lastReturnedTuple = NULL;
1652 }
1653
1654 /*
1655 * This code should match the inner loop of mergeonerun().
1656 */
1657 if (state->memtupcount > 0)
1658 {
1659 int srcTapeIndex = state->memtuples[0].srctape;
1660 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1661 SortTuple newtup;
1662
1663 *stup = state->memtuples[0];
1664
1665 /*
1666 * Remember the tuple we return, so that we can recycle its
1667 * memory on next call. (This can be NULL, in the Datum case).
1668 */
1669 state->lastReturnedTuple = stup->tuple;
1670
1671 /*
1672 * Pull next tuple from tape, and replace the returned tuple
1673 * at top of the heap with it.
1674 */
1675 if (!mergereadnext(state, srcTape, &newtup))
1676 {
1677 /*
1678 * If no more data, we've reached end of run on this tape.
1679 * Remove the top node from the heap.
1680 */
1682 state->nInputRuns--;
1683
1684 /*
1685 * Close the tape. It'd go away at the end of the sort
1686 * anyway, but better to release the memory early.
1687 */
1688 LogicalTapeClose(srcTape);
1689 return true;
1690 }
1691 newtup.srctape = srcTapeIndex;
1693 return true;
1694 }
1695 return false;
1696
1697 default:
1698 elog(ERROR, "invalid tuplesort state");
1699 return false; /* keep compiler quiet */
1700 }
1701}
1702
1703
1704/*
1705 * Advance over N tuples in either forward or back direction,
1706 * without returning any data. N==0 is a no-op.
1707 * Returns true if successful, false if ran out of tuples.
1708 */
1709bool
1711{
1712 MemoryContext oldcontext;
1713
1714 /*
1715 * We don't actually support backwards skip yet, because no callers need
1716 * it. The API is designed to allow for that later, though.
1717 */
1718 Assert(forward);
1719 Assert(ntuples >= 0);
1720 Assert(!WORKER(state));
1721
1722 switch (state->status)
1723 {
1724 case TSS_SORTEDINMEM:
1725 if (state->memtupcount - state->current >= ntuples)
1726 {
1727 state->current += ntuples;
1728 return true;
1729 }
1730 state->current = state->memtupcount;
1731 state->eof_reached = true;
1732
1733 /*
1734 * Complain if caller tries to retrieve more tuples than
1735 * originally asked for in a bounded sort. This is because
1736 * returning EOF here might be the wrong thing.
1737 */
1738 if (state->bounded && state->current >= state->bound)
1739 elog(ERROR, "retrieved too many tuples in a bounded sort");
1740
1741 return false;
1742
1743 case TSS_SORTEDONTAPE:
1744 case TSS_FINALMERGE:
1745
1746 /*
1747 * We could probably optimize these cases better, but for now it's
1748 * not worth the trouble.
1749 */
1750 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1751 while (ntuples-- > 0)
1752 {
1753 SortTuple stup;
1754
1755 if (!tuplesort_gettuple_common(state, forward, &stup))
1756 {
1757 MemoryContextSwitchTo(oldcontext);
1758 return false;
1759 }
1761 }
1762 MemoryContextSwitchTo(oldcontext);
1763 return true;
1764
1765 default:
1766 elog(ERROR, "invalid tuplesort state");
1767 return false; /* keep compiler quiet */
1768 }
1769}
1770
1771/*
1772 * tuplesort_merge_order - report merge order we'll use for given memory
1773 * (note: "merge order" just means the number of input tapes in the merge).
1774 *
1775 * This is exported for use by the planner. allowedMem is in bytes.
1776 */
1777int
1779{
1780 int mOrder;
1781
1782 /*----------
1783 * In the merge phase, we need buffer space for each input and output tape.
1784 * Each pass in the balanced merge algorithm reads from M input tapes, and
1785 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1786 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1787 * input tape.
1788 *
1789 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1790 * N * TAPE_BUFFER_OVERHEAD
1791 *
1792 * Except for the last and next-to-last merge passes, where there can be
1793 * fewer tapes left to process, M = N. We choose M so that we have the
1794 * desired amount of memory available for the input buffers
1795 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1796 * available for the tape buffers (allowedMem).
1797 *
1798 * Note: you might be thinking we need to account for the memtuples[]
1799 * array in this calculation, but we effectively treat that as part of the
1800 * MERGE_BUFFER_SIZE workspace.
1801 *----------
1802 */
1803 mOrder = allowedMem /
1805
1806 /*
1807 * Even in minimum memory, use at least a MINORDER merge. On the other
1808 * hand, even when we have lots of memory, do not use more than a MAXORDER
1809 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1810 * additional tape reduces the amount of memory available to build runs,
1811 * which in turn can cause the same sort to need more runs, which makes
1812 * merging slower even if it can still be done in a single pass. Also,
1813 * high order merges are quite slow due to CPU cache effects; it can be
1814 * faster to pay the I/O cost of a multi-pass merge than to perform a
1815 * single merge pass across many hundreds of tapes.
1816 */
1817 mOrder = Max(mOrder, MINORDER);
1818 mOrder = Min(mOrder, MAXORDER);
1819
1820 return mOrder;
1821}
1822
1823/*
1824 * Helper function to calculate how much memory to allocate for the read buffer
1825 * of each input tape in a merge pass.
1826 *
1827 * 'avail_mem' is the amount of memory available for the buffers of all the
1828 * tapes, both input and output.
1829 * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1830 * 'maxOutputTapes' is the max. number of output tapes we should produce.
1831 */
1832static int64
1833merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1834 int maxOutputTapes)
1835{
1836 int nOutputRuns;
1837 int nOutputTapes;
1838
1839 /*
1840 * How many output tapes will we produce in this pass?
1841 *
1842 * This is nInputRuns / nInputTapes, rounded up.
1843 */
1844 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1845
1846 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1847
1848 /*
1849 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1850 * remaining memory is divided evenly between the input tapes.
1851 *
1852 * This also follows from the formula in tuplesort_merge_order, but here
1853 * we derive the input buffer size from the amount of memory available,
1854 * and M and N.
1855 */
1856 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1857}
1858
1859/*
1860 * inittapes - initialize for tape sorting.
1861 *
1862 * This is called only if we have found we won't sort in memory.
1863 */
1864static void
1866{
1867 Assert(!LEADER(state));
1868
1869 if (mergeruns)
1870 {
1871 /* Compute number of input tapes to use when merging */
1872 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1873 }
1874 else
1875 {
1876 /* Workers can sometimes produce single run, output without merge */
1878 state->maxTapes = MINORDER;
1879 }
1880
1881 if (trace_sort)
1882 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1883 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1884
1885 /* Create the tape set */
1886 inittapestate(state, state->maxTapes);
1887 state->tapeset =
1889 state->shared ? &state->shared->fileset : NULL,
1890 state->worker);
1891
1892 state->currentRun = 0;
1893
1894 /*
1895 * Initialize logical tape arrays.
1896 */
1897 state->inputTapes = NULL;
1898 state->nInputTapes = 0;
1899 state->nInputRuns = 0;
1900
1901 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1902 state->nOutputTapes = 0;
1903 state->nOutputRuns = 0;
1904
1905 state->status = TSS_BUILDRUNS;
1906
1908}
1909
1910/*
1911 * inittapestate - initialize generic tape management state
1912 */
1913static void
1915{
1916 int64 tapeSpace;
1917
1918 /*
1919 * Decrease availMem to reflect the space needed for tape buffers; but
1920 * don't decrease it to the point that we have no room for tuples. (That
1921 * case is only likely to occur if sorting pass-by-value Datums; in all
1922 * other scenarios the memtuples[] array is unlikely to occupy more than
1923 * half of allowedMem. In the pass-by-value case it's not important to
1924 * account for tuple space, so we don't care if LACKMEM becomes
1925 * inaccurate.)
1926 */
1927 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1928
1929 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1930 USEMEM(state, tapeSpace);
1931
1932 /*
1933 * Make sure that the temp file(s) underlying the tape set are created in
1934 * suitable temp tablespaces. For parallel sorts, this should have been
1935 * called already, but it doesn't matter if it is called a second time.
1936 */
1938}
1939
1940/*
1941 * selectnewtape -- select next tape to output to.
1942 *
1943 * This is called after finishing a run when we know another run
1944 * must be started. This is used both when building the initial
1945 * runs, and during merge passes.
1946 */
1947static void
1949{
1950 /*
1951 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1952 * both zero. On each call, we create a new output tape to hold the next
1953 * run, until maxTapes is reached. After that, we assign new runs to the
1954 * existing tapes in a round robin fashion.
1955 */
1956 if (state->nOutputTapes < state->maxTapes)
1957 {
1958 /* Create a new tape to hold the next run */
1959 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1960 Assert(state->nOutputRuns == state->nOutputTapes);
1961 state->destTape = LogicalTapeCreate(state->tapeset);
1962 state->outputTapes[state->nOutputTapes] = state->destTape;
1963 state->nOutputTapes++;
1964 state->nOutputRuns++;
1965 }
1966 else
1967 {
1968 /*
1969 * We have reached the max number of tapes. Append to an existing
1970 * tape.
1971 */
1972 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1973 state->nOutputRuns++;
1974 }
1975}
1976
1977/*
1978 * Initialize the slab allocation arena, for the given number of slots.
1979 */
1980static void
1982{
1983 if (numSlots > 0)
1984 {
1985 char *p;
1986 int i;
1987
1988 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1989 state->slabMemoryEnd = state->slabMemoryBegin +
1990 numSlots * SLAB_SLOT_SIZE;
1991 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1992 USEMEM(state, numSlots * SLAB_SLOT_SIZE);
1993
1994 p = state->slabMemoryBegin;
1995 for (i = 0; i < numSlots - 1; i++)
1996 {
1997 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1998 p += SLAB_SLOT_SIZE;
1999 }
2000 ((SlabSlot *) p)->nextfree = NULL;
2001 }
2002 else
2003 {
2004 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2005 state->slabFreeHead = NULL;
2006 }
2007 state->slabAllocatorUsed = true;
2008}
2009
2010/*
2011 * mergeruns -- merge all the completed initial runs.
2012 *
2013 * This implements the Balanced k-Way Merge Algorithm. All input data has
2014 * already been written to initial runs on tape (see dumptuples).
2015 */
2016static void
2018{
2019 int tapenum;
2020
2021 Assert(state->status == TSS_BUILDRUNS);
2022 Assert(state->memtupcount == 0);
2023
2024 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2025 {
2026 /*
2027 * If there are multiple runs to be merged, when we go to read back
2028 * tuples from disk, abbreviated keys will not have been stored, and
2029 * we don't care to regenerate them. Disable abbreviation from this
2030 * point on.
2031 */
2032 state->base.sortKeys->abbrev_converter = NULL;
2033 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2034
2035 /* Not strictly necessary, but be tidy */
2036 state->base.sortKeys->abbrev_abort = NULL;
2037 state->base.sortKeys->abbrev_full_comparator = NULL;
2038 }
2039
2040 /*
2041 * Reset tuple memory. We've freed all the tuples that we previously
2042 * allocated. We will use the slab allocator from now on.
2043 */
2044 MemoryContextResetOnly(state->base.tuplecontext);
2045
2046 /*
2047 * We no longer need a large memtuples array. (We will allocate a smaller
2048 * one for the heap later.)
2049 */
2050 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2051 pfree(state->memtuples);
2052 state->memtuples = NULL;
2053
2054 /*
2055 * Initialize the slab allocator. We need one slab slot per input tape,
2056 * for the tuples in the heap, plus one to hold the tuple last returned
2057 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2058 * however, we don't need to do allocate anything.)
2059 *
2060 * In a multi-pass merge, we could shrink this allocation for the last
2061 * merge pass, if it has fewer tapes than previous passes, but we don't
2062 * bother.
2063 *
2064 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2065 * to track memory usage of individual tuples.
2066 */
2067 if (state->base.tuples)
2068 init_slab_allocator(state, state->nOutputTapes + 1);
2069 else
2071
2072 /*
2073 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2074 * from each input tape.
2075 *
2076 * We could shrink this, too, between passes in a multi-pass merge, but we
2077 * don't bother. (The initial input tapes are still in outputTapes. The
2078 * number of input tapes will not increase between passes.)
2079 */
2080 state->memtupsize = state->nOutputTapes;
2081 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2082 state->nOutputTapes * sizeof(SortTuple));
2083 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2084
2085 /*
2086 * Use all the remaining memory we have available for tape buffers among
2087 * all the input tapes. At the beginning of each merge pass, we will
2088 * divide this memory between the input and output tapes in the pass.
2089 */
2090 state->tape_buffer_mem = state->availMem;
2091 USEMEM(state, state->tape_buffer_mem);
2092 if (trace_sort)
2093 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2094 state->worker, state->tape_buffer_mem / 1024);
2095
2096 for (;;)
2097 {
2098 /*
2099 * On the first iteration, or if we have read all the runs from the
2100 * input tapes in a multi-pass merge, it's time to start a new pass.
2101 * Rewind all the output tapes, and make them inputs for the next
2102 * pass.
2103 */
2104 if (state->nInputRuns == 0)
2105 {
2106 int64 input_buffer_size;
2107
2108 /* Close the old, emptied, input tapes */
2109 if (state->nInputTapes > 0)
2110 {
2111 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2112 LogicalTapeClose(state->inputTapes[tapenum]);
2113 pfree(state->inputTapes);
2114 }
2115
2116 /* Previous pass's outputs become next pass's inputs. */
2117 state->inputTapes = state->outputTapes;
2118 state->nInputTapes = state->nOutputTapes;
2119 state->nInputRuns = state->nOutputRuns;
2120
2121 /*
2122 * Reset output tape variables. The actual LogicalTapes will be
2123 * created as needed, here we only allocate the array to hold
2124 * them.
2125 */
2126 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2127 state->nOutputTapes = 0;
2128 state->nOutputRuns = 0;
2129
2130 /*
2131 * Redistribute the memory allocated for tape buffers, among the
2132 * new input and output tapes.
2133 */
2134 input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2135 state->nInputTapes,
2136 state->nInputRuns,
2137 state->maxTapes);
2138
2139 if (trace_sort)
2140 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2141 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2142 pg_rusage_show(&state->ru_start));
2143
2144 /* Prepare the new input tapes for merge pass. */
2145 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2146 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2147
2148 /*
2149 * If there's just one run left on each input tape, then only one
2150 * merge pass remains. If we don't have to produce a materialized
2151 * sorted tape, we can stop at this point and do the final merge
2152 * on-the-fly.
2153 */
2154 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2155 && state->nInputRuns <= state->nInputTapes
2156 && !WORKER(state))
2157 {
2158 /* Tell logtape.c we won't be writing anymore */
2160 /* Initialize for the final merge pass */
2162 state->status = TSS_FINALMERGE;
2163 return;
2164 }
2165 }
2166
2167 /* Select an output tape */
2169
2170 /* Merge one run from each input tape. */
2172
2173 /*
2174 * If the input tapes are empty, and we output only one output run,
2175 * we're done. The current output tape contains the final result.
2176 */
2177 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2178 break;
2179 }
2180
2181 /*
2182 * Done. The result is on a single run on a single tape.
2183 */
2184 state->result_tape = state->outputTapes[0];
2185 if (!WORKER(state))
2186 LogicalTapeFreeze(state->result_tape, NULL);
2187 else
2189 state->status = TSS_SORTEDONTAPE;
2190
2191 /* Close all the now-empty input tapes, to release their read buffers. */
2192 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2193 LogicalTapeClose(state->inputTapes[tapenum]);
2194}
2195
2196/*
2197 * Merge one run from each input tape.
2198 */
2199static void
2201{
2202 int srcTapeIndex;
2203 LogicalTape *srcTape;
2204
2205 /*
2206 * Start the merge by loading one tuple from each active source tape into
2207 * the heap.
2208 */
2210
2211 Assert(state->slabAllocatorUsed);
2212
2213 /*
2214 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2215 * out, and replacing it with next tuple from same tape (if there is
2216 * another one).
2217 */
2218 while (state->memtupcount > 0)
2219 {
2220 SortTuple stup;
2221
2222 /* write the tuple to destTape */
2223 srcTapeIndex = state->memtuples[0].srctape;
2224 srcTape = state->inputTapes[srcTapeIndex];
2225 WRITETUP(state, state->destTape, &state->memtuples[0]);
2226
2227 /* recycle the slot of the tuple we just wrote out, for the next read */
2228 if (state->memtuples[0].tuple)
2229 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2230
2231 /*
2232 * pull next tuple from the tape, and replace the written-out tuple in
2233 * the heap with it.
2234 */
2235 if (mergereadnext(state, srcTape, &stup))
2236 {
2237 stup.srctape = srcTapeIndex;
2239 }
2240 else
2241 {
2243 state->nInputRuns--;
2244 }
2245 }
2246
2247 /*
2248 * When the heap empties, we're done. Write an end-of-run marker on the
2249 * output tape.
2250 */
2251 markrunend(state->destTape);
2252}
2253
2254/*
2255 * beginmerge - initialize for a merge pass
2256 *
2257 * Fill the merge heap with the first tuple from each input tape.
2258 */
2259static void
2261{
2262 int activeTapes;
2263 int srcTapeIndex;
2264
2265 /* Heap should be empty here */
2266 Assert(state->memtupcount == 0);
2267
2268 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2269
2270 for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2271 {
2272 SortTuple tup;
2273
2274 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2275 {
2276 tup.srctape = srcTapeIndex;
2278 }
2279 }
2280}
2281
2282/*
2283 * mergereadnext - read next tuple from one merge input tape
2284 *
2285 * Returns false on EOF.
2286 */
2287static bool
2289{
2290 unsigned int tuplen;
2291
2292 /* read next tuple, if any */
2293 if ((tuplen = getlen(srcTape, true)) == 0)
2294 return false;
2295 READTUP(state, stup, srcTape, tuplen);
2296
2297 return true;
2298}
2299
2300/*
2301 * dumptuples - remove tuples from memtuples and write initial run to tape
2302 *
2303 * When alltuples = true, dump everything currently in memory. (This case is
2304 * only used at end of input data.)
2305 */
2306static void
2308{
2309 int memtupwrite;
2310 int i;
2311
2312 /*
2313 * Nothing to do if we still fit in available memory and have array slots,
2314 * unless this is the final call during initial run generation.
2315 */
2316 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2317 !alltuples)
2318 return;
2319
2320 /*
2321 * Final call might require no sorting, in rare cases where we just so
2322 * happen to have previously LACKMEM()'d at the point where exactly all
2323 * remaining tuples are loaded into memory, just before input was
2324 * exhausted. In general, short final runs are quite possible, but avoid
2325 * creating a completely empty run. In a worker, though, we must produce
2326 * at least one tape, even if it's empty.
2327 */
2328 if (state->memtupcount == 0 && state->currentRun > 0)
2329 return;
2330
2331 Assert(state->status == TSS_BUILDRUNS);
2332
2333 /*
2334 * It seems unlikely that this limit will ever be exceeded, but take no
2335 * chances
2336 */
2337 if (state->currentRun == INT_MAX)
2338 ereport(ERROR,
2339 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2340 errmsg("cannot have more than %d runs for an external sort",
2341 INT_MAX)));
2342
2343 if (state->currentRun > 0)
2345
2346 state->currentRun++;
2347
2348 if (trace_sort)
2349 elog(LOG, "worker %d starting quicksort of run %d: %s",
2350 state->worker, state->currentRun,
2351 pg_rusage_show(&state->ru_start));
2352
2353 /*
2354 * Sort all tuples accumulated within the allowed amount of memory for
2355 * this run using quicksort
2356 */
2358
2359 if (trace_sort)
2360 elog(LOG, "worker %d finished quicksort of run %d: %s",
2361 state->worker, state->currentRun,
2362 pg_rusage_show(&state->ru_start));
2363
2364 memtupwrite = state->memtupcount;
2365 for (i = 0; i < memtupwrite; i++)
2366 {
2367 SortTuple *stup = &state->memtuples[i];
2368
2369 WRITETUP(state, state->destTape, stup);
2370 }
2371
2372 state->memtupcount = 0;
2373
2374 /*
2375 * Reset tuple memory. We've freed all of the tuples that we previously
2376 * allocated. It's important to avoid fragmentation when there is a stark
2377 * change in the sizes of incoming tuples. In bounded sorts,
2378 * fragmentation due to AllocSetFree's bucketing by size class might be
2379 * particularly bad if this step wasn't taken.
2380 */
2381 MemoryContextReset(state->base.tuplecontext);
2382
2383 /*
2384 * Now update the memory accounting to subtract the memory used by the
2385 * tuple.
2386 */
2387 FREEMEM(state, state->tupleMem);
2388 state->tupleMem = 0;
2389
2390 markrunend(state->destTape);
2391
2392 if (trace_sort)
2393 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2394 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2395 pg_rusage_show(&state->ru_start));
2396}
2397
2398/*
2399 * tuplesort_rescan - rewind and replay the scan
2400 */
2401void
2403{
2404 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2405
2406 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2407
2408 switch (state->status)
2409 {
2410 case TSS_SORTEDINMEM:
2411 state->current = 0;
2412 state->eof_reached = false;
2413 state->markpos_offset = 0;
2414 state->markpos_eof = false;
2415 break;
2416 case TSS_SORTEDONTAPE:
2417 LogicalTapeRewindForRead(state->result_tape, 0);
2418 state->eof_reached = false;
2419 state->markpos_block = 0L;
2420 state->markpos_offset = 0;
2421 state->markpos_eof = false;
2422 break;
2423 default:
2424 elog(ERROR, "invalid tuplesort state");
2425 break;
2426 }
2427
2428 MemoryContextSwitchTo(oldcontext);
2429}
2430
2431/*
2432 * tuplesort_markpos - saves current position in the merged sort file
2433 */
2434void
2436{
2437 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2438
2439 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2440
2441 switch (state->status)
2442 {
2443 case TSS_SORTEDINMEM:
2444 state->markpos_offset = state->current;
2445 state->markpos_eof = state->eof_reached;
2446 break;
2447 case TSS_SORTEDONTAPE:
2448 LogicalTapeTell(state->result_tape,
2449 &state->markpos_block,
2450 &state->markpos_offset);
2451 state->markpos_eof = state->eof_reached;
2452 break;
2453 default:
2454 elog(ERROR, "invalid tuplesort state");
2455 break;
2456 }
2457
2458 MemoryContextSwitchTo(oldcontext);
2459}
2460
2461/*
2462 * tuplesort_restorepos - restores current position in merged sort file to
2463 * last saved position
2464 */
2465void
2467{
2468 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2469
2470 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2471
2472 switch (state->status)
2473 {
2474 case TSS_SORTEDINMEM:
2475 state->current = state->markpos_offset;
2476 state->eof_reached = state->markpos_eof;
2477 break;
2478 case TSS_SORTEDONTAPE:
2479 LogicalTapeSeek(state->result_tape,
2480 state->markpos_block,
2481 state->markpos_offset);
2482 state->eof_reached = state->markpos_eof;
2483 break;
2484 default:
2485 elog(ERROR, "invalid tuplesort state");
2486 break;
2487 }
2488
2489 MemoryContextSwitchTo(oldcontext);
2490}
2491
2492/*
2493 * tuplesort_get_stats - extract summary statistics
2494 *
2495 * This can be called after tuplesort_performsort() finishes to obtain
2496 * printable summary information about how the sort was performed.
2497 */
2498void
2501{
2502 /*
2503 * Note: it might seem we should provide both memory and disk usage for a
2504 * disk-based sort. However, the current code doesn't track memory space
2505 * accurately once we have begun to return tuples to the caller (since we
2506 * don't account for pfree's the caller is expected to do), so we cannot
2507 * rely on availMem in a disk sort. This does not seem worth the overhead
2508 * to fix. Is it worth creating an API for the memory context code to
2509 * tell us how much is actually used in sortcontext?
2510 */
2512
2513 if (state->isMaxSpaceDisk)
2515 else
2517 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2518
2519 switch (state->maxSpaceStatus)
2520 {
2521 case TSS_SORTEDINMEM:
2522 if (state->boundUsed)
2524 else
2526 break;
2527 case TSS_SORTEDONTAPE:
2529 break;
2530 case TSS_FINALMERGE:
2532 break;
2533 default:
2535 break;
2536 }
2537}
2538
2539/*
2540 * Convert TuplesortMethod to a string.
2541 */
2542const char *
2544{
2545 switch (m)
2546 {
2548 return "still in progress";
2550 return "top-N heapsort";
2552 return "quicksort";
2554 return "external sort";
2556 return "external merge";
2557 }
2558
2559 return "unknown";
2560}
2561
2562/*
2563 * Convert TuplesortSpaceType to a string.
2564 */
2565const char *
2567{
2569 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2570}
2571
2572
2573/*
2574 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2575 */
2576
2577/*
2578 * Convert the existing unordered array of SortTuples to a bounded heap,
2579 * discarding all but the smallest "state->bound" tuples.
2580 *
2581 * When working with a bounded heap, we want to keep the largest entry
2582 * at the root (array entry zero), instead of the smallest as in the normal
2583 * sort case. This allows us to discard the largest entry cheaply.
2584 * Therefore, we temporarily reverse the sort direction.
2585 */
2586static void
2588{
2589 int tupcount = state->memtupcount;
2590 int i;
2591
2592 Assert(state->status == TSS_INITIAL);
2593 Assert(state->bounded);
2594 Assert(tupcount >= state->bound);
2596
2597 /* Reverse sort direction so largest entry will be at root */
2599
2600 state->memtupcount = 0; /* make the heap empty */
2601 for (i = 0; i < tupcount; i++)
2602 {
2603 if (state->memtupcount < state->bound)
2604 {
2605 /* Insert next tuple into heap */
2606 /* Must copy source tuple to avoid possible overwrite */
2607 SortTuple stup = state->memtuples[i];
2608
2610 }
2611 else
2612 {
2613 /*
2614 * The heap is full. Replace the largest entry with the new
2615 * tuple, or just discard it, if it's larger than anything already
2616 * in the heap.
2617 */
2618 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2619 {
2620 free_sort_tuple(state, &state->memtuples[i]);
2622 }
2623 else
2624 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2625 }
2626 }
2627
2628 Assert(state->memtupcount == state->bound);
2629 state->status = TSS_BOUNDED;
2630}
2631
2632/*
2633 * Convert the bounded heap to a properly-sorted array
2634 */
2635static void
2637{
2638 int tupcount = state->memtupcount;
2639
2640 Assert(state->status == TSS_BOUNDED);
2641 Assert(state->bounded);
2642 Assert(tupcount == state->bound);
2644
2645 /*
2646 * We can unheapify in place because each delete-top call will remove the
2647 * largest entry, which we can promptly store in the newly freed slot at
2648 * the end. Once we're down to a single-entry heap, we're done.
2649 */
2650 while (state->memtupcount > 1)
2651 {
2652 SortTuple stup = state->memtuples[0];
2653
2654 /* this sifts-up the next-largest entry and decreases memtupcount */
2656 state->memtuples[state->memtupcount] = stup;
2657 }
2658 state->memtupcount = tupcount;
2659
2660 /*
2661 * Reverse sort direction back to the original state. This is not
2662 * actually necessary but seems like a good idea for tidiness.
2663 */
2665
2666 state->status = TSS_SORTEDINMEM;
2667 state->boundUsed = true;
2668}
2669
2670/*
2671 * Sort all memtuples using specialized qsort() routines.
2672 *
2673 * Quicksort is used for small in-memory sorts, and external sort runs.
2674 */
2675static void
2677{
2678 Assert(!LEADER(state));
2679
2680 if (state->memtupcount > 1)
2681 {
2682 /*
2683 * Do we have the leading column's value or abbreviation in datum1,
2684 * and is there a specialization for its comparator?
2685 */
2686 if (state->base.haveDatum1 && state->base.sortKeys)
2687 {
2688 if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2689 {
2690 qsort_tuple_unsigned(state->memtuples,
2691 state->memtupcount,
2692 state);
2693 return;
2694 }
2695#if SIZEOF_DATUM >= 8
2696 else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2697 {
2698 qsort_tuple_signed(state->memtuples,
2699 state->memtupcount,
2700 state);
2701 return;
2702 }
2703#endif
2704 else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2705 {
2706 qsort_tuple_int32(state->memtuples,
2707 state->memtupcount,
2708 state);
2709 return;
2710 }
2711 }
2712
2713 /* Can we use the single-key sort function? */
2714 if (state->base.onlyKey != NULL)
2715 {
2716 qsort_ssup(state->memtuples, state->memtupcount,
2717 state->base.onlyKey);
2718 }
2719 else
2720 {
2721 qsort_tuple(state->memtuples,
2722 state->memtupcount,
2723 state->base.comparetup,
2724 state);
2725 }
2726 }
2727}
2728
2729/*
2730 * Insert a new tuple into an empty or existing heap, maintaining the
2731 * heap invariant. Caller is responsible for ensuring there's room.
2732 *
2733 * Note: For some callers, tuple points to a memtuples[] entry above the
2734 * end of the heap. This is safe as long as it's not immediately adjacent
2735 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2736 * is, it might get overwritten before being moved into the heap!
2737 */
2738static void
2740{
2741 SortTuple *memtuples;
2742 int j;
2743
2744 memtuples = state->memtuples;
2745 Assert(state->memtupcount < state->memtupsize);
2746
2748
2749 /*
2750 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2751 * using 1-based array indexes, not 0-based.
2752 */
2753 j = state->memtupcount++;
2754 while (j > 0)
2755 {
2756 int i = (j - 1) >> 1;
2757
2758 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2759 break;
2760 memtuples[j] = memtuples[i];
2761 j = i;
2762 }
2763 memtuples[j] = *tuple;
2764}
2765
2766/*
2767 * Remove the tuple at state->memtuples[0] from the heap. Decrement
2768 * memtupcount, and sift up to maintain the heap invariant.
2769 *
2770 * The caller has already free'd the tuple the top node points to,
2771 * if necessary.
2772 */
2773static void
2775{
2776 SortTuple *memtuples = state->memtuples;
2777 SortTuple *tuple;
2778
2779 if (--state->memtupcount <= 0)
2780 return;
2781
2782 /*
2783 * Remove the last tuple in the heap, and re-insert it, by replacing the
2784 * current top node with it.
2785 */
2786 tuple = &memtuples[state->memtupcount];
2788}
2789
2790/*
2791 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2792 * maintain the heap invariant.
2793 *
2794 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2795 * Heapsort, steps H3-H8).
2796 */
2797static void
2799{
2800 SortTuple *memtuples = state->memtuples;
2801 unsigned int i,
2802 n;
2803
2804 Assert(state->memtupcount >= 1);
2805
2807
2808 /*
2809 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2810 * This prevents overflow in the "2 * i + 1" calculation, since at the top
2811 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2812 */
2813 n = state->memtupcount;
2814 i = 0; /* i is where the "hole" is */
2815 for (;;)
2816 {
2817 unsigned int j = 2 * i + 1;
2818
2819 if (j >= n)
2820 break;
2821 if (j + 1 < n &&
2822 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2823 j++;
2824 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2825 break;
2826 memtuples[i] = memtuples[j];
2827 i = j;
2828 }
2829 memtuples[i] = *tuple;
2830}
2831
2832/*
2833 * Function to reverse the sort direction from its current state
2834 *
2835 * It is not safe to call this when performing hash tuplesorts
2836 */
2837static void
2839{
2840 SortSupport sortKey = state->base.sortKeys;
2841 int nkey;
2842
2843 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2844 {
2845 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2846 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2847 }
2848}
2849
2850
2851/*
2852 * Tape interface routines
2853 */
2854
2855static unsigned int
2856getlen(LogicalTape *tape, bool eofOK)
2857{
2858 unsigned int len;
2859
2860 if (LogicalTapeRead(tape,
2861 &len, sizeof(len)) != sizeof(len))
2862 elog(ERROR, "unexpected end of tape");
2863 if (len == 0 && !eofOK)
2864 elog(ERROR, "unexpected end of data");
2865 return len;
2866}
2867
2868static void
2870{
2871 unsigned int len = 0;
2872
2873 LogicalTapeWrite(tape, &len, sizeof(len));
2874}
2875
2876/*
2877 * Get memory for tuple from within READTUP() routine.
2878 *
2879 * We use next free slot from the slab allocator, or palloc() if the tuple
2880 * is too large for that.
2881 */
2882void *
2884{
2885 SlabSlot *buf;
2886
2887 /*
2888 * We pre-allocate enough slots in the slab arena that we should never run
2889 * out.
2890 */
2891 Assert(state->slabFreeHead);
2892
2893 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2894 return MemoryContextAlloc(state->base.sortcontext, tuplen);
2895 else
2896 {
2897 buf = state->slabFreeHead;
2898 /* Reuse this slot */
2899 state->slabFreeHead = buf->nextfree;
2900
2901 return buf;
2902 }
2903}
2904
2905
2906/*
2907 * Parallel sort routines
2908 */
2909
2910/*
2911 * tuplesort_estimate_shared - estimate required shared memory allocation
2912 *
2913 * nWorkers is an estimate of the number of workers (it's the number that
2914 * will be requested).
2915 */
2916Size
2918{
2919 Size tapesSize;
2920
2921 Assert(nWorkers > 0);
2922
2923 /* Make sure that BufFile shared state is MAXALIGN'd */
2924 tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2925 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2926
2927 return tapesSize;
2928}
2929
2930/*
2931 * tuplesort_initialize_shared - initialize shared tuplesort state
2932 *
2933 * Must be called from leader process before workers are launched, to
2934 * establish state needed up-front for worker tuplesortstates. nWorkers
2935 * should match the argument passed to tuplesort_estimate_shared().
2936 */
2937void
2939{
2940 int i;
2941
2942 Assert(nWorkers > 0);
2943
2944 SpinLockInit(&shared->mutex);
2945 shared->currentWorker = 0;
2946 shared->workersFinished = 0;
2947 SharedFileSetInit(&shared->fileset, seg);
2948 shared->nTapes = nWorkers;
2949 for (i = 0; i < nWorkers; i++)
2950 {
2951 shared->tapes[i].firstblocknumber = 0L;
2952 }
2953}
2954
2955/*
2956 * tuplesort_attach_shared - attach to shared tuplesort state
2957 *
2958 * Must be called by all worker processes.
2959 */
2960void
2962{
2963 /* Attach to SharedFileSet */
2964 SharedFileSetAttach(&shared->fileset, seg);
2965}
2966
2967/*
2968 * worker_get_identifier - Assign and return ordinal identifier for worker
2969 *
2970 * The order in which these are assigned is not well defined, and should not
2971 * matter; worker numbers across parallel sort participants need only be
2972 * distinct and gapless. logtape.c requires this.
2973 *
2974 * Note that the identifiers assigned from here have no relation to
2975 * ParallelWorkerNumber number, to avoid making any assumption about
2976 * caller's requirements. However, we do follow the ParallelWorkerNumber
2977 * convention of representing a non-worker with worker number -1. This
2978 * includes the leader, as well as serial Tuplesort processes.
2979 */
2980static int
2982{
2983 Sharedsort *shared = state->shared;
2984 int worker;
2985
2987
2988 SpinLockAcquire(&shared->mutex);
2989 worker = shared->currentWorker++;
2990 SpinLockRelease(&shared->mutex);
2991
2992 return worker;
2993}
2994
2995/*
2996 * worker_freeze_result_tape - freeze worker's result tape for leader
2997 *
2998 * This is called by workers just after the result tape has been determined,
2999 * instead of calling LogicalTapeFreeze() directly. They do so because
3000 * workers require a few additional steps over similar serial
3001 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3002 * steps are around freeing now unneeded resources, and representing to
3003 * leader that worker's input run is available for its merge.
3004 *
3005 * There should only be one final output run for each worker, which consists
3006 * of all tuples that were originally input into worker.
3007 */
3008static void
3010{
3011 Sharedsort *shared = state->shared;
3013
3015 Assert(state->result_tape != NULL);
3016 Assert(state->memtupcount == 0);
3017
3018 /*
3019 * Free most remaining memory, in case caller is sensitive to our holding
3020 * on to it. memtuples may not be a tiny merge heap at this point.
3021 */
3022 pfree(state->memtuples);
3023 /* Be tidy */
3024 state->memtuples = NULL;
3025 state->memtupsize = 0;
3026
3027 /*
3028 * Parallel worker requires result tape metadata, which is to be stored in
3029 * shared memory for leader
3030 */
3031 LogicalTapeFreeze(state->result_tape, &output);
3032
3033 /* Store properties of output tape, and update finished worker count */
3034 SpinLockAcquire(&shared->mutex);
3035 shared->tapes[state->worker] = output;
3036 shared->workersFinished++;
3037 SpinLockRelease(&shared->mutex);
3038}
3039
3040/*
3041 * worker_nomergeruns - dump memtuples in worker, without merging
3042 *
3043 * This called as an alternative to mergeruns() with a worker when no
3044 * merging is required.
3045 */
3046static void
3048{
3050 Assert(state->result_tape == NULL);
3051 Assert(state->nOutputRuns == 1);
3052
3053 state->result_tape = state->destTape;
3055}
3056
3057/*
3058 * leader_takeover_tapes - create tapeset for leader from worker tapes
3059 *
3060 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3061 * sorting has occurred in workers, all of which must have already returned
3062 * from tuplesort_performsort().
3063 *
3064 * When this returns, leader process is left in a state that is virtually
3065 * indistinguishable from it having generated runs as a serial external sort
3066 * might have.
3067 */
3068static void
3070{
3071 Sharedsort *shared = state->shared;
3072 int nParticipants = state->nParticipants;
3073 int workersFinished;
3074 int j;
3075
3077 Assert(nParticipants >= 1);
3078
3079 SpinLockAcquire(&shared->mutex);
3080 workersFinished = shared->workersFinished;
3081 SpinLockRelease(&shared->mutex);
3082
3083 if (nParticipants != workersFinished)
3084 elog(ERROR, "cannot take over tapes before all workers finish");
3085
3086 /*
3087 * Create the tapeset from worker tapes, including a leader-owned tape at
3088 * the end. Parallel workers are far more expensive than logical tapes,
3089 * so the number of tapes allocated here should never be excessive.
3090 */
3091 inittapestate(state, nParticipants);
3092 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3093
3094 /*
3095 * Set currentRun to reflect the number of runs we will merge (it's not
3096 * used for anything, this is just pro forma)
3097 */
3098 state->currentRun = nParticipants;
3099
3100 /*
3101 * Initialize the state to look the same as after building the initial
3102 * runs.
3103 *
3104 * There will always be exactly 1 run per worker, and exactly one input
3105 * tape per run, because workers always output exactly 1 run, even when
3106 * there were no input tuples for workers to sort.
3107 */
3108 state->inputTapes = NULL;
3109 state->nInputTapes = 0;
3110 state->nInputRuns = 0;
3111
3112 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3113 state->nOutputTapes = nParticipants;
3114 state->nOutputRuns = nParticipants;
3115
3116 for (j = 0; j < nParticipants; j++)
3117 {
3118 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3119 }
3120
3121 state->status = TSS_BUILDRUNS;
3122}
3123
3124/*
3125 * Convenience routine to free a tuple previously loaded into sort memory
3126 */
3127static void
3129{
3130 if (stup->tuple)
3131 {
3133 pfree(stup->tuple);
3134 stup->tuple = NULL;
3135 }
3136}
3137
3138int
3140{
3141 if (x < y)
3142 return -1;
3143 else if (x > y)
3144 return 1;
3145 else
3146 return 0;
3147}
3148
3149#if SIZEOF_DATUM >= 8
3150int
3151ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3152{
3153 int64 xx = DatumGetInt64(x);
3154 int64 yy = DatumGetInt64(y);
3155
3156 if (xx < yy)
3157 return -1;
3158 else if (xx > yy)
3159 return 1;
3160 else
3161 return 0;
3162}
3163#endif
3164
3165int
3167{
3168 int32 xx = DatumGetInt32(x);
3169 int32 yy = DatumGetInt32(y);
3170
3171 if (xx < yy)
3172 return -1;
3173 else if (xx > yy)
3174 return 1;
3175 else
3176 return 0;
3177}
void PrepareTempTablespaces(void)
Definition: tablespace.c:1331
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: bump.c:131
#define Min(x, y)
Definition: c.h:961
#define MAXALIGN(LEN)
Definition: c.h:768
#define Max(x, y)
Definition: c.h:955
#define INT64_FORMAT
Definition: c.h:506
#define Assert(condition)
Definition: c.h:815
int64_t int64
Definition: c.h:485
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:420
#define pg_attribute_always_inline
Definition: c.h:256
int32_t int32
Definition: c.h:484
size_t Size
Definition: c.h:562
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
FILE * output
int y
Definition: isn.c:71
int b
Definition: isn.c:69
int x
Definition: isn.c:70
int a
Definition: isn.c:68
int j
Definition: isn.c:73
int i
Definition: isn.c:72
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void pfree(void *pointer)
Definition: mcxt.c:1521
Size GetMemoryChunkSpace(void *pointer)
Definition: mcxt.c:721
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * repalloc_huge(void *pointer, Size size)
Definition: mcxt.c:1672
void MemoryContextResetOnly(MemoryContext context)
Definition: mcxt.c:402
#define AllocSetContextCreate
Definition: memutils.h:129
#define MaxAllocHugeSize
Definition: memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const void size_t len
const char * pg_rusage_show(const PGRUsage *ru0)
Definition: pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition: pg_rusage.c:27
static char * buf
Definition: pg_test_fsync.c:72
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:390
uintptr_t Datum
Definition: postgres.h:69
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:56
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:233
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:302
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
SharedFileSet fileset
Definition: tuplesort.c:360
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: tuplesort.c:369
int workersFinished
Definition: tuplesort.c:357
int nTapes
Definition: tuplesort.c:363
slock_t mutex
Definition: tuplesort.c:346
int currentWorker
Definition: tuplesort.c:356
Sharedsort * sharedsort
Definition: tuplesort.h:58
bool ssup_nulls_first
Definition: sortsupport.h:75
void * tuple
Definition: tuplesort.h:149
int srctape
Definition: tuplesort.h:152
Datum datum1
Definition: tuplesort.h:150
int64 firstblocknumber
Definition: logtape.h:54
TuplesortMethod sortMethod
Definition: tuplesort.h:112
TuplesortSpaceType spaceType
Definition: tuplesort.h:113
void * lastReturnedTuple
Definition: tuplesort.c:264
LogicalTapeSet * tapeset
Definition: tuplesort.c:208
bool isMaxSpaceDisk
Definition: tuplesort.c:204
bool growmemtuples
Definition: tuplesort.c:220
SortTuple * memtuples
Definition: tuplesort.c:217
int64 maxSpace
Definition: tuplesort.c:202
LogicalTape ** inputTapes
Definition: tuplesort.c:280
bool slabAllocatorUsed
Definition: tuplesort.c:249
TuplesortPublic base
Definition: tuplesort.c:187
char * slabMemoryEnd
Definition: tuplesort.c:252
int64 tupleMem
Definition: tuplesort.c:193
PGRUsage ru_start
Definition: tuplesort.c:336
char * slabMemoryBegin
Definition: tuplesort.c:251
LogicalTape ** outputTapes
Definition: tuplesort.c:284
bool eof_reached
Definition: tuplesort.c:297
size_t tape_buffer_mem
Definition: tuplesort.c:256
TupSortStatus status
Definition: tuplesort.c:188
int64 availMem
Definition: tuplesort.c:198
LogicalTape * destTape
Definition: tuplesort.c:288
TupSortStatus maxSpaceStatus
Definition: tuplesort.c:207
bool markpos_eof
Definition: tuplesort.c:302
int64 abbrevNext
Definition: tuplesort.c:330
int64 markpos_block
Definition: tuplesort.c:300
Sharedsort * shared
Definition: tuplesort.c:321
LogicalTape * result_tape
Definition: tuplesort.c:295
SlabSlot * slabFreeHead
Definition: tuplesort.c:253
int markpos_offset
Definition: tuplesort.c:301
int64 allowedMem
Definition: tuplesort.c:199
Definition: regguts.h:323
void tuplesort_rescan(Tuplesortstate *state)
Definition: tuplesort.c:2402
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1363
int tuplesort_merge_order(int64 allowedMem)
Definition: tuplesort.c:1778
#define TAPE_BUFFER_OVERHEAD
Definition: tuplesort.c:178
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition: tuplesort.c:2774
#define INITIAL_MEMTUPSIZE
Definition: tuplesort.c:120
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition: tuplesort.c:2856
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition: tuplesort.c:2938
#define COMPARETUP(state, a, b)
Definition: tuplesort.c:396
static void selectnewtape(Tuplesortstate *state)
Definition: tuplesort.c:1948
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1019
#define SERIAL(state)
Definition: tuplesort.c:403
#define FREESTATE(state)
Definition: tuplesort.c:399
static void markrunend(LogicalTape *tape)
Definition: tuplesort.c:2869
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition: tuplesort.c:1710
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition: tuplesort.c:3128
#define REMOVEABBREV(state, stup, count)
Definition: tuplesort.c:395
#define LACKMEM(state)
Definition: tuplesort.c:400
static void reversedirection(Tuplesortstate *state)
Definition: tuplesort.c:2838
#define USEMEM(state, amt)
Definition: tuplesort.c:401
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2739
static bool grow_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:1052
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3139
static void beginmerge(Tuplesortstate *state)
Definition: tuplesort.c:2260
static void make_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2587
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:886
#define WRITETUP(state, tape, stup)
Definition: tuplesort.c:397
static void sort_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2636
TupSortStatus
Definition: tuplesort.c:155
@ TSS_SORTEDONTAPE
Definition: tuplesort.c:160
@ TSS_SORTEDINMEM
Definition: tuplesort.c:159
@ TSS_INITIAL
Definition: tuplesort.c:156
@ TSS_FINALMERGE
Definition: tuplesort.c:161
@ TSS_BUILDRUNS
Definition: tuplesort.c:158
@ TSS_BOUNDED
Definition: tuplesort.c:157
static int worker_get_identifier(Tuplesortstate *state)
Definition: tuplesort.c:2981
static void mergeonerun(Tuplesortstate *state)
Definition: tuplesort.c:2200
#define FREEMEM(state, amt)
Definition: tuplesort.c:402
#define MAXORDER
Definition: tuplesort.c:177
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition: tuplesort.c:1914
#define SLAB_SLOT_SIZE
Definition: tuplesort.c:142
static void leader_takeover_tapes(Tuplesortstate *state)
Definition: tuplesort.c:3069
Size tuplesort_estimate_shared(int nWorkers)
Definition: tuplesort.c:2917
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:2499
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition: tuplesort.c:642
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:2676
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:951
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition: tuplesort.c:1865
void tuplesort_markpos(Tuplesortstate *state)
Definition: tuplesort.c:2435
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition: tuplesort.c:1169
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition: tuplesort.c:2566
#define MERGE_BUFFER_SIZE
Definition: tuplesort.c:179
#define READTUP(state, stup, tape, len)
Definition: tuplesort.c:398
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3166
#define LEADER(state)
Definition: tuplesort.c:405
#define WORKER(state)
Definition: tuplesort.c:404
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition: tuplesort.c:1470
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition: tuplesort.c:1833
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition: tuplesort.c:2288
union SlabSlot SlabSlot
static void tuplesort_updatemax(Tuplesortstate *state)
Definition: tuplesort.c:968
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition: tuplesort.c:3009
bool trace_sort
Definition: tuplesort.c:124
#define RELEASE_SLAB_SLOT(state, tuple)
Definition: tuplesort.c:383
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition: tuplesort.c:2961
static void worker_nomergeruns(Tuplesortstate *state)
Definition: tuplesort.c:3047
const char * tuplesort_method_name(TuplesortMethod m)
Definition: tuplesort.c:2543
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:495
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2798
void tuplesort_restorepos(Tuplesortstate *state)
Definition: tuplesort.c:2466
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:542
static void mergeruns(Tuplesortstate *state)
Definition: tuplesort.c:2017
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition: tuplesort.c:2883
#define MINORDER
Definition: tuplesort.c:176
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition: tuplesort.c:752
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:838
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition: tuplesort.c:1981
static bool consider_abort_common(Tuplesortstate *state)
Definition: tuplesort.c:1319
static void tuplesort_free(Tuplesortstate *state)
Definition: tuplesort.c:897
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition: tuplesort.c:2307
#define TupleSortUseBumpTupleCxt(opt)
Definition: tuplesort.h:108
#define TUPLESORT_RANDOMACCESS
Definition: tuplesort.h:96
#define TUPLESORT_ALLOWBOUNDED
Definition: tuplesort.h:99
TuplesortSpaceType
Definition: tuplesort.h:87
@ SORT_SPACE_TYPE_DISK
Definition: tuplesort.h:88
@ SORT_SPACE_TYPE_MEMORY
Definition: tuplesort.h:89
TuplesortMethod
Definition: tuplesort.h:76
@ SORT_TYPE_EXTERNAL_SORT
Definition: tuplesort.h:80
@ SORT_TYPE_TOP_N_HEAPSORT
Definition: tuplesort.h:78
@ SORT_TYPE_QUICKSORT
Definition: tuplesort.h:79
@ SORT_TYPE_STILL_IN_PROGRESS
Definition: tuplesort.h:77
@ SORT_TYPE_EXTERNAL_MERGE
Definition: tuplesort.h:81
char buffer[SLAB_SLOT_SIZE]
Definition: tuplesort.c:147
union SlabSlot * nextfree
Definition: tuplesort.c:146