PostgreSQL Source Code git master
tuplesort.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * tuplesort.c
4 * Generalized tuple sorting routines.
5 *
6 * This module provides a generalized facility for tuple sorting, which can be
7 * applied to different kinds of sortable objects. Implementation of
8 * the particular sorting variants is given in tuplesortvariants.c.
9 * This module works efficiently for both small and large amounts
10 * of data. Small amounts are sorted in-memory using qsort(). Large
11 * amounts are sorted using temporary files and a standard external sort
12 * algorithm.
13 *
14 * See Knuth, volume 3, for more than you want to know about external
15 * sorting algorithms. The algorithm we use is a balanced k-way merge.
16 * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17 * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18 * merge is better. Knuth is assuming that tape drives are expensive
19 * beasts, and in particular that there will always be many more runs than
20 * tape drives. The polyphase merge algorithm was good at keeping all the
21 * tape drives busy, but in our implementation a "tape drive" doesn't cost
22 * much more than a few Kb of memory buffers, so we can afford to have
23 * lots of them. In particular, if we can have as many tape drives as
24 * sorted runs, we can eliminate any repeated I/O at all.
25 *
26 * Historically, we divided the input into sorted runs using replacement
27 * selection, in the form of a priority tree implemented as a heap
28 * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29 * for run generation.
30 *
31 * The approximate amount of memory allowed for any one sort operation
32 * is specified in kilobytes by the caller (most pass work_mem). Initially,
33 * we absorb tuples and simply store them in an unsorted array as long as
34 * we haven't exceeded workMem. If we reach the end of the input without
35 * exceeding workMem, we sort the array using qsort() and subsequently return
36 * tuples just by scanning the tuple array sequentially. If we do exceed
37 * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38 * When tuples are dumped in batch after quicksorting, we begin a new run
39 * with a new output tape. If we reach the max number of tapes, we write
40 * subsequent runs on the existing tapes in a round-robin fashion. We will
41 * need multiple merge passes to finish the merge in that case. After the
42 * end of the input is reached, we dump out remaining tuples in memory into
43 * a final run, then merge the runs.
44 *
45 * When merging runs, we use a heap containing just the frontmost tuple from
46 * each source run; we repeatedly output the smallest tuple and replace it
47 * with the next tuple from its source tape (if any). When the heap empties,
48 * the merge is complete. The basic merge algorithm thus needs very little
49 * memory --- only M tuples for an M-way merge, and M is constrained to a
50 * small number. However, we can still make good use of our full workMem
51 * allocation by pre-reading additional blocks from each source tape. Without
52 * prereading, our access pattern to the temporary file would be very erratic;
53 * on average we'd read one block from each of M source tapes during the same
54 * time that we're writing M blocks to the output tape, so there is no
55 * sequentiality of access at all, defeating the read-ahead methods used by
56 * most Unix kernels. Worse, the output tape gets written into a very random
57 * sequence of blocks of the temp file, ensuring that things will be even
58 * worse when it comes time to read that tape. A straightforward merge pass
59 * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60 * by prereading from each source tape sequentially, loading about workMem/M
61 * bytes from each tape in turn, and making the sequential blocks immediately
62 * available for reuse. This approach helps to localize both read and write
63 * accesses. The pre-reading is handled by logtape.c, we just tell it how
64 * much memory to use for the buffers.
65 *
66 * In the current code we determine the number of input tapes M on the basis
67 * of workMem: we want workMem/M to be large enough that we read a fair
68 * amount of data each time we read from a tape, so as to maintain the
69 * locality of access described above. Nonetheless, with large workMem we
70 * can have many tapes. The logical "tapes" are implemented by logtape.c,
71 * which avoids space wastage by recycling disk space as soon as each block
72 * is read from its "tape".
73 *
74 * When the caller requests random access to the sort result, we form
75 * the final sorted run on a logical tape which is then "frozen", so
76 * that we can access it randomly. When the caller does not need random
77 * access, we return from tuplesort_performsort() as soon as we are down
78 * to one run per logical tape. The final merge is then performed
79 * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80 * saves one cycle of writing all the data out to disk and reading it in.
81 *
82 * This module supports parallel sorting. Parallel sorts involve coordination
83 * among one or more worker processes, and a leader process, each with its own
84 * tuplesort state. The leader process (or, more accurately, the
85 * Tuplesortstate associated with a leader process) creates a full tapeset
86 * consisting of worker tapes with one run to merge; a run for every
87 * worker process. This is then merged. Worker processes are guaranteed to
88 * produce exactly one output run from their partial input.
89 *
90 *
91 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
92 * Portions Copyright (c) 1994, Regents of the University of California
93 *
94 * IDENTIFICATION
95 * src/backend/utils/sort/tuplesort.c
96 *
97 *-------------------------------------------------------------------------
98 */
99
100#include "postgres.h"
101
102#include <limits.h>
103
104#include "commands/tablespace.h"
105#include "miscadmin.h"
106#include "pg_trace.h"
107#include "storage/shmem.h"
108#include "utils/guc.h"
109#include "utils/memutils.h"
110#include "utils/pg_rusage.h"
111#include "utils/tuplesort.h"
112
113/*
114 * Initial size of memtuples array. This must be more than
115 * ALLOCSET_SEPARATE_THRESHOLD; see comments in grow_memtuples(). Clamp at
116 * 1024 elements to avoid excessive reallocs.
117 */
118#define INITIAL_MEMTUPSIZE Max(1024, \
119 ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
120
121/* GUC variables */
122bool trace_sort = false;
123
124#ifdef DEBUG_BOUNDED_SORT
125bool optimize_bounded_sort = true;
126#endif
127
128
129/*
130 * During merge, we use a pre-allocated set of fixed-size slots to hold
131 * tuples. To avoid palloc/pfree overhead.
132 *
133 * Merge doesn't require a lot of memory, so we can afford to waste some,
134 * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
135 * palloc() overhead is not significant anymore.
136 *
137 * 'nextfree' is valid when this chunk is in the free list. When in use, the
138 * slot holds a tuple.
139 */
140#define SLAB_SLOT_SIZE 1024
141
142typedef union SlabSlot
143{
147
148/*
149 * Possible states of a Tuplesort object. These denote the states that
150 * persist between calls of Tuplesort routines.
151 */
152typedef enum
153{
154 TSS_INITIAL, /* Loading tuples; still within memory limit */
155 TSS_BOUNDED, /* Loading tuples into bounded-size heap */
156 TSS_BUILDRUNS, /* Loading tuples; writing to tape */
157 TSS_SORTEDINMEM, /* Sort completed entirely in memory */
158 TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
159 TSS_FINALMERGE, /* Performing final merge on-the-fly */
161
162/*
163 * Parameters for calculation of number of tapes to use --- see inittapes()
164 * and tuplesort_merge_order().
165 *
166 * In this calculation we assume that each tape will cost us about 1 blocks
167 * worth of buffer space. This ignores the overhead of all the other data
168 * structures needed for each tape, but it's probably close enough.
169 *
170 * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
171 * input tape, for pre-reading (see discussion at top of file). This is *in
172 * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
173 */
174#define MINORDER 6 /* minimum merge order */
175#define MAXORDER 500 /* maximum merge order */
176#define TAPE_BUFFER_OVERHEAD BLCKSZ
177#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
178
179
180/*
181 * Private state of a Tuplesort operation.
182 */
184{
186 TupSortStatus status; /* enumerated value as shown above */
187 bool bounded; /* did caller specify a maximum number of
188 * tuples to return? */
189 bool boundUsed; /* true if we made use of a bounded heap */
190 int bound; /* if bounded, the maximum number of tuples */
191 int64 tupleMem; /* memory consumed by individual tuples.
192 * storing this separately from what we track
193 * in availMem allows us to subtract the
194 * memory consumed by all tuples when dumping
195 * tuples to tape */
196 int64 availMem; /* remaining memory available, in bytes */
197 int64 allowedMem; /* total memory allowed, in bytes */
198 int maxTapes; /* max number of input tapes to merge in each
199 * pass */
200 int64 maxSpace; /* maximum amount of space occupied among sort
201 * of groups, either in-memory or on-disk */
202 bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
203 * space, false when its value for in-memory
204 * space */
205 TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
206 LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
207
208 /*
209 * This array holds the tuples now in sort memory. If we are in state
210 * INITIAL, the tuples are in no particular order; if we are in state
211 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
212 * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
213 * H. In state SORTEDONTAPE, the array is not used.
214 */
215 SortTuple *memtuples; /* array of SortTuple structs */
216 int memtupcount; /* number of tuples currently present */
217 int memtupsize; /* allocated length of memtuples array */
218 bool growmemtuples; /* memtuples' growth still underway? */
219
220 /*
221 * Memory for tuples is sometimes allocated using a simple slab allocator,
222 * rather than with palloc(). Currently, we switch to slab allocation
223 * when we start merging. Merging only needs to keep a small, fixed
224 * number of tuples in memory at any time, so we can avoid the
225 * palloc/pfree overhead by recycling a fixed number of fixed-size slots
226 * to hold the tuples.
227 *
228 * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
229 * slots. The allocation is sized to have one slot per tape, plus one
230 * additional slot. We need that many slots to hold all the tuples kept
231 * in the heap during merge, plus the one we have last returned from the
232 * sort, with tuplesort_gettuple.
233 *
234 * Initially, all the slots are kept in a linked list of free slots. When
235 * a tuple is read from a tape, it is put to the next available slot, if
236 * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
237 * instead.
238 *
239 * When we're done processing a tuple, we return the slot back to the free
240 * list, or pfree() if it was palloc'd. We know that a tuple was
241 * allocated from the slab, if its pointer value is between
242 * slabMemoryBegin and -End.
243 *
244 * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
245 * tracking memory usage is not used.
246 */
248
249 char *slabMemoryBegin; /* beginning of slab memory arena */
250 char *slabMemoryEnd; /* end of slab memory arena */
251 SlabSlot *slabFreeHead; /* head of free list */
252
253 /* Memory used for input and output tape buffers. */
255
256 /*
257 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
258 * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
259 * modes), we remember the tuple in 'lastReturnedTuple', so that we can
260 * recycle the memory on next gettuple call.
261 */
263
264 /*
265 * While building initial runs, this is the current output run number.
266 * Afterwards, it is the number of initial runs we made.
267 */
269
270 /*
271 * Logical tapes, for merging.
272 *
273 * The initial runs are written in the output tapes. In each merge pass,
274 * the output tapes of the previous pass become the input tapes, and new
275 * output tapes are created as needed. When nInputTapes equals
276 * nInputRuns, there is only one merge pass left.
277 */
281
285
286 LogicalTape *destTape; /* current output tape */
287
288 /*
289 * These variables are used after completion of sorting to keep track of
290 * the next tuple to return. (In the tape case, the tape's current read
291 * position is also critical state.)
292 */
293 LogicalTape *result_tape; /* actual tape of finished output */
294 int current; /* array index (only used if SORTEDINMEM) */
295 bool eof_reached; /* reached EOF (needed for cursors) */
296
297 /* markpos_xxx holds marked position for mark and restore */
298 int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
299 int markpos_offset; /* saved "current", or offset in tape block */
300 bool markpos_eof; /* saved "eof_reached" */
301
302 /*
303 * These variables are used during parallel sorting.
304 *
305 * worker is our worker identifier. Follows the general convention that
306 * -1 value relates to a leader tuplesort, and values >= 0 worker
307 * tuplesorts. (-1 can also be a serial tuplesort.)
308 *
309 * shared is mutable shared memory state, which is used to coordinate
310 * parallel sorts.
311 *
312 * nParticipants is the number of worker Tuplesortstates known by the
313 * leader to have actually been launched, which implies that they must
314 * finish a run that the leader needs to merge. Typically includes a
315 * worker state held by the leader process itself. Set in the leader
316 * Tuplesortstate only.
317 */
321
322 /*
323 * Additional state for managing "abbreviated key" sortsupport routines
324 * (which currently may be used by all cases except the hash index case).
325 * Tracks the intervals at which the optimization's effectiveness is
326 * tested.
327 */
328 int64 abbrevNext; /* Tuple # at which to next check
329 * applicability */
330
331 /*
332 * Resource snapshot for time of sort start.
333 */
335};
336
337/*
338 * Private mutable state of tuplesort-parallel-operation. This is allocated
339 * in shared memory.
340 */
342{
343 /* mutex protects all fields prior to tapes */
344 slock_t mutex;
345
346 /*
347 * currentWorker generates ordinal identifier numbers for parallel sort
348 * workers. These start from 0, and are always gapless.
349 *
350 * Workers increment workersFinished to indicate having finished. If this
351 * is equal to state.nParticipants within the leader, leader is ready to
352 * merge worker runs.
353 */
356
357 /* Temporary file space */
359
360 /* Size of tapes flexible array */
362
363 /*
364 * Tapes array used by workers to report back information needed by the
365 * leader to concatenate all worker tapes into one for merging
366 */
368};
369
370/*
371 * Is the given tuple allocated from the slab memory arena?
372 */
373#define IS_SLAB_SLOT(state, tuple) \
374 ((char *) (tuple) >= (state)->slabMemoryBegin && \
375 (char *) (tuple) < (state)->slabMemoryEnd)
376
377/*
378 * Return the given tuple to the slab memory free list, or free it
379 * if it was palloc'd.
380 */
381#define RELEASE_SLAB_SLOT(state, tuple) \
382 do { \
383 SlabSlot *buf = (SlabSlot *) tuple; \
384 \
385 if (IS_SLAB_SLOT((state), buf)) \
386 { \
387 buf->nextfree = (state)->slabFreeHead; \
388 (state)->slabFreeHead = buf; \
389 } else \
390 pfree(buf); \
391 } while(0)
392
393#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
394#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
395#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
396#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
397#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
398#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
399#define USEMEM(state,amt) ((state)->availMem -= (amt))
400#define FREEMEM(state,amt) ((state)->availMem += (amt))
401#define SERIAL(state) ((state)->shared == NULL)
402#define WORKER(state) ((state)->shared && (state)->worker != -1)
403#define LEADER(state) ((state)->shared && (state)->worker == -1)
404
405/*
406 * NOTES about on-tape representation of tuples:
407 *
408 * We require the first "unsigned int" of a stored tuple to be the total size
409 * on-tape of the tuple, including itself (so it is never zero; an all-zero
410 * unsigned int is used to delimit runs). The remainder of the stored tuple
411 * may or may not match the in-memory representation of the tuple ---
412 * any conversion needed is the job of the writetup and readtup routines.
413 *
414 * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
415 * representation of the tuple must be followed by another "unsigned int" that
416 * is a copy of the length --- so the total tape space used is actually
417 * sizeof(unsigned int) more than the stored length value. This allows
418 * read-backwards. When the random access flag was not specified, the
419 * write/read routines may omit the extra length word.
420 *
421 * writetup is expected to write both length words as well as the tuple
422 * data. When readtup is called, the tape is positioned just after the
423 * front length word; readtup must read the tuple data and advance past
424 * the back length word (if present).
425 *
426 * The write/read routines can make use of the tuple description data
427 * stored in the Tuplesortstate record, if needed. They are also expected
428 * to adjust state->availMem by the amount of memory space (not tape space!)
429 * released or consumed. There is no error return from either writetup
430 * or readtup; they should ereport() on failure.
431 *
432 *
433 * NOTES about memory consumption calculations:
434 *
435 * We count space allocated for tuples against the workMem limit, plus
436 * the space used by the variable-size memtuples array. Fixed-size space
437 * is not counted; it's small enough to not be interesting.
438 *
439 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
440 * rather than the originally-requested size. This is important since
441 * palloc can add substantial overhead. It's not a complete answer since
442 * we won't count any wasted space in palloc allocation blocks, but it's
443 * a lot better than what we were doing before 7.3. As of 9.6, a
444 * separate memory context is used for caller passed tuples. Resetting
445 * it at certain key increments significantly ameliorates fragmentation.
446 * readtup routines use the slab allocator (they cannot use
447 * the reset context because it gets deleted at the point that merging
448 * begins).
449 */
450
451
454static void inittapes(Tuplesortstate *state, bool mergeruns);
455static void inittapestate(Tuplesortstate *state, int maxTapes);
457static void init_slab_allocator(Tuplesortstate *state, int numSlots);
458static void mergeruns(Tuplesortstate *state);
459static void mergeonerun(Tuplesortstate *state);
460static void beginmerge(Tuplesortstate *state);
461static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
462static void dumptuples(Tuplesortstate *state, bool alltuples);
470static unsigned int getlen(LogicalTape *tape, bool eofOK);
471static void markrunend(LogicalTape *tape);
476static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
479
480/*
481 * Specialized comparators that we can inline into specialized sorts. The goal
482 * is to try to sort two tuples without having to follow the pointers to the
483 * comparator or the tuple.
484 *
485 * XXX: For now, there is no specialization for cases where datum1 is
486 * authoritative and we don't even need to fall back to a callback at all (that
487 * would be true for types like int4/int8/timestamp/date, but not true for
488 * abbreviations of text or multi-key sorts. There could be! Is it worth it?
489 */
490
491/* Used if first key's comparator is ssup_datum_unsigned_cmp */
494{
495 int compare;
496
497 compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
498 b->datum1, b->isnull1,
499 &state->base.sortKeys[0]);
500 if (compare != 0)
501 return compare;
502
503 /*
504 * No need to waste effort calling the tiebreak function when there are no
505 * other keys to sort on.
506 */
507 if (state->base.onlyKey != NULL)
508 return 0;
509
510 return state->base.comparetup_tiebreak(a, b, state);
511}
512
513/* Used if first key's comparator is ssup_datum_signed_cmp */
516{
517 int compare;
518
519 compare = ApplySignedSortComparator(a->datum1, a->isnull1,
520 b->datum1, b->isnull1,
521 &state->base.sortKeys[0]);
522
523 if (compare != 0)
524 return compare;
525
526 /*
527 * No need to waste effort calling the tiebreak function when there are no
528 * other keys to sort on.
529 */
530 if (state->base.onlyKey != NULL)
531 return 0;
532
533 return state->base.comparetup_tiebreak(a, b, state);
534}
535
536/* Used if first key's comparator is ssup_datum_int32_cmp */
539{
540 int compare;
541
542 compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
543 b->datum1, b->isnull1,
544 &state->base.sortKeys[0]);
545
546 if (compare != 0)
547 return compare;
548
549 /*
550 * No need to waste effort calling the tiebreak function when there are no
551 * other keys to sort on.
552 */
553 if (state->base.onlyKey != NULL)
554 return 0;
555
556 return state->base.comparetup_tiebreak(a, b, state);
557}
558
559/*
560 * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
561 * any variant of SortTuples, using the appropriate comparetup function.
562 * qsort_ssup() is specialized for the case where the comparetup function
563 * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
564 * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
565 * common comparison functions on pass-by-value leading datums.
566 */
567
568#define ST_SORT qsort_tuple_unsigned
569#define ST_ELEMENT_TYPE SortTuple
570#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
571#define ST_COMPARE_ARG_TYPE Tuplesortstate
572#define ST_CHECK_FOR_INTERRUPTS
573#define ST_SCOPE static
574#define ST_DEFINE
575#include "lib/sort_template.h"
576
577#define ST_SORT qsort_tuple_signed
578#define ST_ELEMENT_TYPE SortTuple
579#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
580#define ST_COMPARE_ARG_TYPE Tuplesortstate
581#define ST_CHECK_FOR_INTERRUPTS
582#define ST_SCOPE static
583#define ST_DEFINE
584#include "lib/sort_template.h"
585
586#define ST_SORT qsort_tuple_int32
587#define ST_ELEMENT_TYPE SortTuple
588#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
589#define ST_COMPARE_ARG_TYPE Tuplesortstate
590#define ST_CHECK_FOR_INTERRUPTS
591#define ST_SCOPE static
592#define ST_DEFINE
593#include "lib/sort_template.h"
594
595#define ST_SORT qsort_tuple
596#define ST_ELEMENT_TYPE SortTuple
597#define ST_COMPARE_RUNTIME_POINTER
598#define ST_COMPARE_ARG_TYPE Tuplesortstate
599#define ST_CHECK_FOR_INTERRUPTS
600#define ST_SCOPE static
601#define ST_DECLARE
602#define ST_DEFINE
603#include "lib/sort_template.h"
604
605#define ST_SORT qsort_ssup
606#define ST_ELEMENT_TYPE SortTuple
607#define ST_COMPARE(a, b, ssup) \
608 ApplySortComparator((a)->datum1, (a)->isnull1, \
609 (b)->datum1, (b)->isnull1, (ssup))
610#define ST_COMPARE_ARG_TYPE SortSupportData
611#define ST_CHECK_FOR_INTERRUPTS
612#define ST_SCOPE static
613#define ST_DEFINE
614#include "lib/sort_template.h"
615
616/*
617 * tuplesort_begin_xxx
618 *
619 * Initialize for a tuple sort operation.
620 *
621 * After calling tuplesort_begin, the caller should call tuplesort_putXXX
622 * zero or more times, then call tuplesort_performsort when all the tuples
623 * have been supplied. After performsort, retrieve the tuples in sorted
624 * order by calling tuplesort_getXXX until it returns false/NULL. (If random
625 * access was requested, rescan, markpos, and restorepos can also be called.)
626 * Call tuplesort_end to terminate the operation and release memory/disk space.
627 *
628 * Each variant of tuplesort_begin has a workMem parameter specifying the
629 * maximum number of kilobytes of RAM to use before spilling data to disk.
630 * (The normal value of this parameter is work_mem, but some callers use
631 * other values.) Each variant also has a sortopt which is a bitmask of
632 * sort options. See TUPLESORT_* definitions in tuplesort.h
633 */
634
636tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
637{
639 MemoryContext maincontext;
640 MemoryContext sortcontext;
641 MemoryContext oldcontext;
642
643 /* See leader_takeover_tapes() remarks on random access support */
644 if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
645 elog(ERROR, "random access disallowed under parallel sort");
646
647 /*
648 * Memory context surviving tuplesort_reset. This memory context holds
649 * data which is useful to keep while sorting multiple similar batches.
650 */
652 "TupleSort main",
654
655 /*
656 * Create a working memory context for one sort operation. The content of
657 * this context is deleted by tuplesort_reset.
658 */
659 sortcontext = AllocSetContextCreate(maincontext,
660 "TupleSort sort",
662
663 /*
664 * Additionally a working memory context for tuples is setup in
665 * tuplesort_begin_batch.
666 */
667
668 /*
669 * Make the Tuplesortstate within the per-sortstate context. This way, we
670 * don't need a separate pfree() operation for it at shutdown.
671 */
672 oldcontext = MemoryContextSwitchTo(maincontext);
673
675
676 if (trace_sort)
677 pg_rusage_init(&state->ru_start);
678
679 state->base.sortopt = sortopt;
680 state->base.tuples = true;
681 state->abbrevNext = 10;
682
683 /*
684 * workMem is forced to be at least 64KB, the current minimum valid value
685 * for the work_mem GUC. This is a defense against parallel sort callers
686 * that divide out memory among many workers in a way that leaves each
687 * with very little memory.
688 */
689 state->allowedMem = Max(workMem, 64) * (int64) 1024;
690 state->base.sortcontext = sortcontext;
691 state->base.maincontext = maincontext;
692
693 state->memtupsize = INITIAL_MEMTUPSIZE;
694 state->memtuples = NULL;
695
696 /*
697 * After all of the other non-parallel-related state, we setup all of the
698 * state needed for each batch.
699 */
701
702 /*
703 * Initialize parallel-related state based on coordination information
704 * from caller
705 */
706 if (!coordinate)
707 {
708 /* Serial sort */
709 state->shared = NULL;
710 state->worker = -1;
711 state->nParticipants = -1;
712 }
713 else if (coordinate->isWorker)
714 {
715 /* Parallel worker produces exactly one final run from all input */
716 state->shared = coordinate->sharedsort;
718 state->nParticipants = -1;
719 }
720 else
721 {
722 /* Parallel leader state only used for final merge */
723 state->shared = coordinate->sharedsort;
724 state->worker = -1;
725 state->nParticipants = coordinate->nParticipants;
726 Assert(state->nParticipants >= 1);
727 }
728
729 MemoryContextSwitchTo(oldcontext);
730
731 return state;
732}
733
734/*
735 * tuplesort_begin_batch
736 *
737 * Setup, or reset, all state need for processing a new set of tuples with this
738 * sort state. Called both from tuplesort_begin_common (the first time sorting
739 * with this sort state) and tuplesort_reset (for subsequent usages).
740 */
741static void
743{
744 MemoryContext oldcontext;
745
746 oldcontext = MemoryContextSwitchTo(state->base.maincontext);
747
748 /*
749 * Caller tuple (e.g. IndexTuple) memory context.
750 *
751 * A dedicated child context used exclusively for caller passed tuples
752 * eases memory management. Resetting at key points reduces
753 * fragmentation. Note that the memtuples array of SortTuples is allocated
754 * in the parent context, not this context, because there is no need to
755 * free memtuples early. For bounded sorts, tuples may be pfreed in any
756 * order, so we use a regular aset.c context so that it can make use of
757 * free'd memory. When the sort is not bounded, we make use of a bump.c
758 * context as this keeps allocations more compact with less wastage.
759 * Allocations are also slightly more CPU efficient.
760 */
761 if (TupleSortUseBumpTupleCxt(state->base.sortopt))
762 state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
763 "Caller tuples",
765 else
766 state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
767 "Caller tuples",
769
770
771 state->status = TSS_INITIAL;
772 state->bounded = false;
773 state->boundUsed = false;
774
775 state->availMem = state->allowedMem;
776
777 state->tapeset = NULL;
778
779 state->memtupcount = 0;
780
781 state->growmemtuples = true;
782 state->slabAllocatorUsed = false;
783 if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
784 {
785 pfree(state->memtuples);
786 state->memtuples = NULL;
787 state->memtupsize = INITIAL_MEMTUPSIZE;
788 }
789 if (state->memtuples == NULL)
790 {
791 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
793 }
794
795 /* workMem must be large enough for the minimal memtuples array */
796 if (LACKMEM(state))
797 elog(ERROR, "insufficient memory allowed for sort");
798
799 state->currentRun = 0;
800
801 /*
802 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
803 * inittapes(), if needed.
804 */
805
806 state->result_tape = NULL; /* flag that result tape has not been formed */
807
808 MemoryContextSwitchTo(oldcontext);
809}
810
811/*
812 * tuplesort_set_bound
813 *
814 * Advise tuplesort that at most the first N result tuples are required.
815 *
816 * Must be called before inserting any tuples. (Actually, we could allow it
817 * as long as the sort hasn't spilled to disk, but there seems no need for
818 * delayed calls at the moment.)
819 *
820 * This is a hint only. The tuplesort may still return more tuples than
821 * requested. Parallel leader tuplesorts will always ignore the hint.
822 */
823void
825{
826 /* Assert we're called before loading any tuples */
827 Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
828 /* Assert we allow bounded sorts */
829 Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
830 /* Can't set the bound twice, either */
831 Assert(!state->bounded);
832 /* Also, this shouldn't be called in a parallel worker */
834
835 /* Parallel leader allows but ignores hint */
836 if (LEADER(state))
837 return;
838
839#ifdef DEBUG_BOUNDED_SORT
840 /* Honor GUC setting that disables the feature (for easy testing) */
841 if (!optimize_bounded_sort)
842 return;
843#endif
844
845 /* We want to be able to compute bound * 2, so limit the setting */
846 if (bound > (int64) (INT_MAX / 2))
847 return;
848
849 state->bounded = true;
850 state->bound = (int) bound;
851
852 /*
853 * Bounded sorts are not an effective target for abbreviated key
854 * optimization. Disable by setting state to be consistent with no
855 * abbreviation support.
856 */
857 state->base.sortKeys->abbrev_converter = NULL;
858 if (state->base.sortKeys->abbrev_full_comparator)
859 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
860
861 /* Not strictly necessary, but be tidy */
862 state->base.sortKeys->abbrev_abort = NULL;
863 state->base.sortKeys->abbrev_full_comparator = NULL;
864}
865
866/*
867 * tuplesort_used_bound
868 *
869 * Allow callers to find out if the sort state was able to use a bound.
870 */
871bool
873{
874 return state->boundUsed;
875}
876
877/*
878 * tuplesort_free
879 *
880 * Internal routine for freeing resources of tuplesort.
881 */
882static void
884{
885 /* context swap probably not needed, but let's be safe */
886 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
887 int64 spaceUsed;
888
889 if (state->tapeset)
890 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
891 else
892 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
893
894 /*
895 * Delete temporary "tape" files, if any.
896 *
897 * We don't bother to destroy the individual tapes here. They will go away
898 * with the sortcontext. (In TSS_FINALMERGE state, we have closed
899 * finished tapes already.)
900 */
901 if (state->tapeset)
902 LogicalTapeSetClose(state->tapeset);
903
904 if (trace_sort)
905 {
906 if (state->tapeset)
907 elog(LOG, "%s of worker %d ended, %" PRId64 " disk blocks used: %s",
908 SERIAL(state) ? "external sort" : "parallel external sort",
909 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
910 else
911 elog(LOG, "%s of worker %d ended, %" PRId64 " KB used: %s",
912 SERIAL(state) ? "internal sort" : "unperformed parallel sort",
913 state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
914 }
915
916 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
917
919 MemoryContextSwitchTo(oldcontext);
920
921 /*
922 * Free the per-sort memory context, thereby releasing all working memory.
923 */
924 MemoryContextReset(state->base.sortcontext);
925}
926
927/*
928 * tuplesort_end
929 *
930 * Release resources and clean up.
931 *
932 * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
933 * pointing to garbage. Be careful not to attempt to use or free such
934 * pointers afterwards!
935 */
936void
938{
940
941 /*
942 * Free the main memory context, including the Tuplesortstate struct
943 * itself.
944 */
945 MemoryContextDelete(state->base.maincontext);
946}
947
948/*
949 * tuplesort_updatemax
950 *
951 * Update maximum resource usage statistics.
952 */
953static void
955{
956 int64 spaceUsed;
957 bool isSpaceDisk;
958
959 /*
960 * Note: it might seem we should provide both memory and disk usage for a
961 * disk-based sort. However, the current code doesn't track memory space
962 * accurately once we have begun to return tuples to the caller (since we
963 * don't account for pfree's the caller is expected to do), so we cannot
964 * rely on availMem in a disk sort. This does not seem worth the overhead
965 * to fix. Is it worth creating an API for the memory context code to
966 * tell us how much is actually used in sortcontext?
967 */
968 if (state->tapeset)
969 {
970 isSpaceDisk = true;
971 spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
972 }
973 else
974 {
975 isSpaceDisk = false;
976 spaceUsed = state->allowedMem - state->availMem;
977 }
978
979 /*
980 * Sort evicts data to the disk when it wasn't able to fit that data into
981 * main memory. This is why we assume space used on the disk to be more
982 * important for tracking resource usage than space used in memory. Note
983 * that the amount of space occupied by some tupleset on the disk might be
984 * less than amount of space occupied by the same tupleset in memory due
985 * to more compact representation.
986 */
987 if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
988 (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
989 {
990 state->maxSpace = spaceUsed;
991 state->isMaxSpaceDisk = isSpaceDisk;
992 state->maxSpaceStatus = state->status;
993 }
994}
995
996/*
997 * tuplesort_reset
998 *
999 * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1000 * meta-information in. After tuplesort_reset, tuplesort is ready to start
1001 * a new sort. This allows avoiding recreation of tuple sort states (and
1002 * save resources) when sorting multiple small batches.
1003 */
1004void
1006{
1009
1010 /*
1011 * After we've freed up per-batch memory, re-setup all of the state common
1012 * to both the first batch and any subsequent batch.
1013 */
1015
1016 state->lastReturnedTuple = NULL;
1017 state->slabMemoryBegin = NULL;
1018 state->slabMemoryEnd = NULL;
1019 state->slabFreeHead = NULL;
1020}
1021
1022/*
1023 * Grow the memtuples[] array, if possible within our memory constraint. We
1024 * must not exceed INT_MAX tuples in memory or the caller-provided memory
1025 * limit. Return true if we were able to enlarge the array, false if not.
1026 *
1027 * Normally, at each increment we double the size of the array. When doing
1028 * that would exceed a limit, we attempt one last, smaller increase (and then
1029 * clear the growmemtuples flag so we don't try any more). That allows us to
1030 * use memory as fully as permitted; sticking to the pure doubling rule could
1031 * result in almost half going unused. Because availMem moves around with
1032 * tuple addition/removal, we need some rule to prevent making repeated small
1033 * increases in memtupsize, which would just be useless thrashing. The
1034 * growmemtuples flag accomplishes that and also prevents useless
1035 * recalculations in this function.
1036 */
1037static bool
1039{
1040 int newmemtupsize;
1041 int memtupsize = state->memtupsize;
1042 int64 memNowUsed = state->allowedMem - state->availMem;
1043
1044 /* Forget it if we've already maxed out memtuples, per comment above */
1045 if (!state->growmemtuples)
1046 return false;
1047
1048 /* Select new value of memtupsize */
1049 if (memNowUsed <= state->availMem)
1050 {
1051 /*
1052 * We've used no more than half of allowedMem; double our usage,
1053 * clamping at INT_MAX tuples.
1054 */
1055 if (memtupsize < INT_MAX / 2)
1056 newmemtupsize = memtupsize * 2;
1057 else
1058 {
1059 newmemtupsize = INT_MAX;
1060 state->growmemtuples = false;
1061 }
1062 }
1063 else
1064 {
1065 /*
1066 * This will be the last increment of memtupsize. Abandon doubling
1067 * strategy and instead increase as much as we safely can.
1068 *
1069 * To stay within allowedMem, we can't increase memtupsize by more
1070 * than availMem / sizeof(SortTuple) elements. In practice, we want
1071 * to increase it by considerably less, because we need to leave some
1072 * space for the tuples to which the new array slots will refer. We
1073 * assume the new tuples will be about the same size as the tuples
1074 * we've already seen, and thus we can extrapolate from the space
1075 * consumption so far to estimate an appropriate new size for the
1076 * memtuples array. The optimal value might be higher or lower than
1077 * this estimate, but it's hard to know that in advance. We again
1078 * clamp at INT_MAX tuples.
1079 *
1080 * This calculation is safe against enlarging the array so much that
1081 * LACKMEM becomes true, because the memory currently used includes
1082 * the present array; thus, there would be enough allowedMem for the
1083 * new array elements even if no other memory were currently used.
1084 *
1085 * We do the arithmetic in float8, because otherwise the product of
1086 * memtupsize and allowedMem could overflow. Any inaccuracy in the
1087 * result should be insignificant; but even if we computed a
1088 * completely insane result, the checks below will prevent anything
1089 * really bad from happening.
1090 */
1091 double grow_ratio;
1092
1093 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1094 if (memtupsize * grow_ratio < INT_MAX)
1095 newmemtupsize = (int) (memtupsize * grow_ratio);
1096 else
1097 newmemtupsize = INT_MAX;
1098
1099 /* We won't make any further enlargement attempts */
1100 state->growmemtuples = false;
1101 }
1102
1103 /* Must enlarge array by at least one element, else report failure */
1104 if (newmemtupsize <= memtupsize)
1105 goto noalloc;
1106
1107 /*
1108 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1109 * to ensure our request won't be rejected. Note that we can easily
1110 * exhaust address space before facing this outcome. (This is presently
1111 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1112 * don't rely on that at this distance.)
1113 */
1114 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1115 {
1116 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1117 state->growmemtuples = false; /* can't grow any more */
1118 }
1119
1120 /*
1121 * We need to be sure that we do not cause LACKMEM to become true, else
1122 * the space management algorithm will go nuts. The code above should
1123 * never generate a dangerous request, but to be safe, check explicitly
1124 * that the array growth fits within availMem. (We could still cause
1125 * LACKMEM if the memory chunk overhead associated with the memtuples
1126 * array were to increase. That shouldn't happen because we chose the
1127 * initial array size large enough to ensure that palloc will be treating
1128 * both old and new arrays as separate chunks. But we'll check LACKMEM
1129 * explicitly below just in case.)
1130 */
1131 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1132 goto noalloc;
1133
1134 /* OK, do it */
1135 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1136 state->memtupsize = newmemtupsize;
1137 state->memtuples = (SortTuple *)
1138 repalloc_huge(state->memtuples,
1139 state->memtupsize * sizeof(SortTuple));
1140 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1141 if (LACKMEM(state))
1142 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1143 return true;
1144
1145noalloc:
1146 /* If for any reason we didn't realloc, shut off future attempts */
1147 state->growmemtuples = false;
1148 return false;
1149}
1150
1151/*
1152 * Shared code for tuple and datum cases.
1153 */
1154void
1156 bool useAbbrev, Size tuplen)
1157{
1158 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1159
1160 Assert(!LEADER(state));
1161
1162 /* account for the memory used for this tuple */
1163 USEMEM(state, tuplen);
1164 state->tupleMem += tuplen;
1165
1166 if (!useAbbrev)
1167 {
1168 /*
1169 * Leave ordinary Datum representation, or NULL value. If there is a
1170 * converter it won't expect NULL values, and cost model is not
1171 * required to account for NULL, so in that case we avoid calling
1172 * converter and just set datum1 to zeroed representation (to be
1173 * consistent, and to support cheap inequality tests for NULL
1174 * abbreviated keys).
1175 */
1176 }
1177 else if (!consider_abort_common(state))
1178 {
1179 /* Store abbreviated key representation */
1180 tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1181 state->base.sortKeys);
1182 }
1183 else
1184 {
1185 /*
1186 * Set state to be consistent with never trying abbreviation.
1187 *
1188 * Alter datum1 representation in already-copied tuples, so as to
1189 * ensure a consistent representation (current tuple was just
1190 * handled). It does not matter if some dumped tuples are already
1191 * sorted on tape, since serialized tuples lack abbreviated keys
1192 * (TSS_BUILDRUNS state prevents control reaching here in any case).
1193 */
1194 REMOVEABBREV(state, state->memtuples, state->memtupcount);
1195 }
1196
1197 switch (state->status)
1198 {
1199 case TSS_INITIAL:
1200
1201 /*
1202 * Save the tuple into the unsorted array. First, grow the array
1203 * as needed. Note that we try to grow the array when there is
1204 * still one free slot remaining --- if we fail, there'll still be
1205 * room to store the incoming tuple, and then we'll switch to
1206 * tape-based operation.
1207 */
1208 if (state->memtupcount >= state->memtupsize - 1)
1209 {
1210 (void) grow_memtuples(state);
1211 Assert(state->memtupcount < state->memtupsize);
1212 }
1213 state->memtuples[state->memtupcount++] = *tuple;
1214
1215 /*
1216 * Check if it's time to switch over to a bounded heapsort. We do
1217 * so if the input tuple count exceeds twice the desired tuple
1218 * count (this is a heuristic for where heapsort becomes cheaper
1219 * than a quicksort), or if we've just filled workMem and have
1220 * enough tuples to meet the bound.
1221 *
1222 * Note that once we enter TSS_BOUNDED state we will always try to
1223 * complete the sort that way. In the worst case, if later input
1224 * tuples are larger than earlier ones, this might cause us to
1225 * exceed workMem significantly.
1226 */
1227 if (state->bounded &&
1228 (state->memtupcount > state->bound * 2 ||
1229 (state->memtupcount > state->bound && LACKMEM(state))))
1230 {
1231 if (trace_sort)
1232 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1233 state->memtupcount,
1234 pg_rusage_show(&state->ru_start));
1236 MemoryContextSwitchTo(oldcontext);
1237 return;
1238 }
1239
1240 /*
1241 * Done if we still fit in available memory and have array slots.
1242 */
1243 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1244 {
1245 MemoryContextSwitchTo(oldcontext);
1246 return;
1247 }
1248
1249 /*
1250 * Nope; time to switch to tape-based operation.
1251 */
1252 inittapes(state, true);
1253
1254 /*
1255 * Dump all tuples.
1256 */
1257 dumptuples(state, false);
1258 break;
1259
1260 case TSS_BOUNDED:
1261
1262 /*
1263 * We don't want to grow the array here, so check whether the new
1264 * tuple can be discarded before putting it in. This should be a
1265 * good speed optimization, too, since when there are many more
1266 * input tuples than the bound, most input tuples can be discarded
1267 * with just this one comparison. Note that because we currently
1268 * have the sort direction reversed, we must check for <= not >=.
1269 */
1270 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1271 {
1272 /* new tuple <= top of the heap, so we can discard it */
1273 free_sort_tuple(state, tuple);
1275 }
1276 else
1277 {
1278 /* discard top of heap, replacing it with the new tuple */
1279 free_sort_tuple(state, &state->memtuples[0]);
1281 }
1282 break;
1283
1284 case TSS_BUILDRUNS:
1285
1286 /*
1287 * Save the tuple into the unsorted array (there must be space)
1288 */
1289 state->memtuples[state->memtupcount++] = *tuple;
1290
1291 /*
1292 * If we are over the memory limit, dump all tuples.
1293 */
1294 dumptuples(state, false);
1295 break;
1296
1297 default:
1298 elog(ERROR, "invalid tuplesort state");
1299 break;
1300 }
1301 MemoryContextSwitchTo(oldcontext);
1302}
1303
1304static bool
1306{
1307 Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1308 Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1309 Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1310
1311 /*
1312 * Check effectiveness of abbreviation optimization. Consider aborting
1313 * when still within memory limit.
1314 */
1315 if (state->status == TSS_INITIAL &&
1316 state->memtupcount >= state->abbrevNext)
1317 {
1318 state->abbrevNext *= 2;
1319
1320 /*
1321 * Check opclass-supplied abbreviation abort routine. It may indicate
1322 * that abbreviation should not proceed.
1323 */
1324 if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1325 state->base.sortKeys))
1326 return false;
1327
1328 /*
1329 * Finally, restore authoritative comparator, and indicate that
1330 * abbreviation is not in play by setting abbrev_converter to NULL
1331 */
1332 state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1333 state->base.sortKeys[0].abbrev_converter = NULL;
1334 /* Not strictly necessary, but be tidy */
1335 state->base.sortKeys[0].abbrev_abort = NULL;
1336 state->base.sortKeys[0].abbrev_full_comparator = NULL;
1337
1338 /* Give up - expect original pass-by-value representation */
1339 return true;
1340 }
1341
1342 return false;
1343}
1344
1345/*
1346 * All tuples have been provided; finish the sort.
1347 */
1348void
1350{
1351 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1352
1353 if (trace_sort)
1354 elog(LOG, "performsort of worker %d starting: %s",
1355 state->worker, pg_rusage_show(&state->ru_start));
1356
1357 switch (state->status)
1358 {
1359 case TSS_INITIAL:
1360
1361 /*
1362 * We were able to accumulate all the tuples within the allowed
1363 * amount of memory, or leader to take over worker tapes
1364 */
1365 if (SERIAL(state))
1366 {
1367 /* Just qsort 'em and we're done */
1369 state->status = TSS_SORTEDINMEM;
1370 }
1371 else if (WORKER(state))
1372 {
1373 /*
1374 * Parallel workers must still dump out tuples to tape. No
1375 * merge is required to produce single output run, though.
1376 */
1377 inittapes(state, false);
1378 dumptuples(state, true);
1380 state->status = TSS_SORTEDONTAPE;
1381 }
1382 else
1383 {
1384 /*
1385 * Leader will take over worker tapes and merge worker runs.
1386 * Note that mergeruns sets the correct state->status.
1387 */
1390 }
1391 state->current = 0;
1392 state->eof_reached = false;
1393 state->markpos_block = 0L;
1394 state->markpos_offset = 0;
1395 state->markpos_eof = false;
1396 break;
1397
1398 case TSS_BOUNDED:
1399
1400 /*
1401 * We were able to accumulate all the tuples required for output
1402 * in memory, using a heap to eliminate excess tuples. Now we
1403 * have to transform the heap to a properly-sorted array. Note
1404 * that sort_bounded_heap sets the correct state->status.
1405 */
1407 state->current = 0;
1408 state->eof_reached = false;
1409 state->markpos_offset = 0;
1410 state->markpos_eof = false;
1411 break;
1412
1413 case TSS_BUILDRUNS:
1414
1415 /*
1416 * Finish tape-based sort. First, flush all tuples remaining in
1417 * memory out to tape; then merge until we have a single remaining
1418 * run (or, if !randomAccess and !WORKER(), one run per tape).
1419 * Note that mergeruns sets the correct state->status.
1420 */
1421 dumptuples(state, true);
1423 state->eof_reached = false;
1424 state->markpos_block = 0L;
1425 state->markpos_offset = 0;
1426 state->markpos_eof = false;
1427 break;
1428
1429 default:
1430 elog(ERROR, "invalid tuplesort state");
1431 break;
1432 }
1433
1434 if (trace_sort)
1435 {
1436 if (state->status == TSS_FINALMERGE)
1437 elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1438 state->worker, state->nInputTapes,
1439 pg_rusage_show(&state->ru_start));
1440 else
1441 elog(LOG, "performsort of worker %d done: %s",
1442 state->worker, pg_rusage_show(&state->ru_start));
1443 }
1444
1445 MemoryContextSwitchTo(oldcontext);
1446}
1447
1448/*
1449 * Internal routine to fetch the next tuple in either forward or back
1450 * direction into *stup. Returns false if no more tuples.
1451 * Returned tuple belongs to tuplesort memory context, and must not be freed
1452 * by caller. Note that fetched tuple is stored in memory that may be
1453 * recycled by any future fetch.
1454 */
1455bool
1457 SortTuple *stup)
1458{
1459 unsigned int tuplen;
1460 size_t nmoved;
1461
1462 Assert(!WORKER(state));
1463
1464 switch (state->status)
1465 {
1466 case TSS_SORTEDINMEM:
1467 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1468 Assert(!state->slabAllocatorUsed);
1469 if (forward)
1470 {
1471 if (state->current < state->memtupcount)
1472 {
1473 *stup = state->memtuples[state->current++];
1474 return true;
1475 }
1476 state->eof_reached = true;
1477
1478 /*
1479 * Complain if caller tries to retrieve more tuples than
1480 * originally asked for in a bounded sort. This is because
1481 * returning EOF here might be the wrong thing.
1482 */
1483 if (state->bounded && state->current >= state->bound)
1484 elog(ERROR, "retrieved too many tuples in a bounded sort");
1485
1486 return false;
1487 }
1488 else
1489 {
1490 if (state->current <= 0)
1491 return false;
1492
1493 /*
1494 * if all tuples are fetched already then we return last
1495 * tuple, else - tuple before last returned.
1496 */
1497 if (state->eof_reached)
1498 state->eof_reached = false;
1499 else
1500 {
1501 state->current--; /* last returned tuple */
1502 if (state->current <= 0)
1503 return false;
1504 }
1505 *stup = state->memtuples[state->current - 1];
1506 return true;
1507 }
1508 break;
1509
1510 case TSS_SORTEDONTAPE:
1511 Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1512 Assert(state->slabAllocatorUsed);
1513
1514 /*
1515 * The slot that held the tuple that we returned in previous
1516 * gettuple call can now be reused.
1517 */
1518 if (state->lastReturnedTuple)
1519 {
1520 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1521 state->lastReturnedTuple = NULL;
1522 }
1523
1524 if (forward)
1525 {
1526 if (state->eof_reached)
1527 return false;
1528
1529 if ((tuplen = getlen(state->result_tape, true)) != 0)
1530 {
1531 READTUP(state, stup, state->result_tape, tuplen);
1532
1533 /*
1534 * Remember the tuple we return, so that we can recycle
1535 * its memory on next call. (This can be NULL, in the
1536 * !state->tuples case).
1537 */
1538 state->lastReturnedTuple = stup->tuple;
1539
1540 return true;
1541 }
1542 else
1543 {
1544 state->eof_reached = true;
1545 return false;
1546 }
1547 }
1548
1549 /*
1550 * Backward.
1551 *
1552 * if all tuples are fetched already then we return last tuple,
1553 * else - tuple before last returned.
1554 */
1555 if (state->eof_reached)
1556 {
1557 /*
1558 * Seek position is pointing just past the zero tuplen at the
1559 * end of file; back up to fetch last tuple's ending length
1560 * word. If seek fails we must have a completely empty file.
1561 */
1562 nmoved = LogicalTapeBackspace(state->result_tape,
1563 2 * sizeof(unsigned int));
1564 if (nmoved == 0)
1565 return false;
1566 else if (nmoved != 2 * sizeof(unsigned int))
1567 elog(ERROR, "unexpected tape position");
1568 state->eof_reached = false;
1569 }
1570 else
1571 {
1572 /*
1573 * Back up and fetch previously-returned tuple's ending length
1574 * word. If seek fails, assume we are at start of file.
1575 */
1576 nmoved = LogicalTapeBackspace(state->result_tape,
1577 sizeof(unsigned int));
1578 if (nmoved == 0)
1579 return false;
1580 else if (nmoved != sizeof(unsigned int))
1581 elog(ERROR, "unexpected tape position");
1582 tuplen = getlen(state->result_tape, false);
1583
1584 /*
1585 * Back up to get ending length word of tuple before it.
1586 */
1587 nmoved = LogicalTapeBackspace(state->result_tape,
1588 tuplen + 2 * sizeof(unsigned int));
1589 if (nmoved == tuplen + sizeof(unsigned int))
1590 {
1591 /*
1592 * We backed up over the previous tuple, but there was no
1593 * ending length word before it. That means that the prev
1594 * tuple is the first tuple in the file. It is now the
1595 * next to read in forward direction (not obviously right,
1596 * but that is what in-memory case does).
1597 */
1598 return false;
1599 }
1600 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1601 elog(ERROR, "bogus tuple length in backward scan");
1602 }
1603
1604 tuplen = getlen(state->result_tape, false);
1605
1606 /*
1607 * Now we have the length of the prior tuple, back up and read it.
1608 * Note: READTUP expects we are positioned after the initial
1609 * length word of the tuple, so back up to that point.
1610 */
1611 nmoved = LogicalTapeBackspace(state->result_tape,
1612 tuplen);
1613 if (nmoved != tuplen)
1614 elog(ERROR, "bogus tuple length in backward scan");
1615 READTUP(state, stup, state->result_tape, tuplen);
1616
1617 /*
1618 * Remember the tuple we return, so that we can recycle its memory
1619 * on next call. (This can be NULL, in the Datum case).
1620 */
1621 state->lastReturnedTuple = stup->tuple;
1622
1623 return true;
1624
1625 case TSS_FINALMERGE:
1626 Assert(forward);
1627 /* We are managing memory ourselves, with the slab allocator. */
1628 Assert(state->slabAllocatorUsed);
1629
1630 /*
1631 * The slab slot holding the tuple that we returned in previous
1632 * gettuple call can now be reused.
1633 */
1634 if (state->lastReturnedTuple)
1635 {
1636 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1637 state->lastReturnedTuple = NULL;
1638 }
1639
1640 /*
1641 * This code should match the inner loop of mergeonerun().
1642 */
1643 if (state->memtupcount > 0)
1644 {
1645 int srcTapeIndex = state->memtuples[0].srctape;
1646 LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1647 SortTuple newtup;
1648
1649 *stup = state->memtuples[0];
1650
1651 /*
1652 * Remember the tuple we return, so that we can recycle its
1653 * memory on next call. (This can be NULL, in the Datum case).
1654 */
1655 state->lastReturnedTuple = stup->tuple;
1656
1657 /*
1658 * Pull next tuple from tape, and replace the returned tuple
1659 * at top of the heap with it.
1660 */
1661 if (!mergereadnext(state, srcTape, &newtup))
1662 {
1663 /*
1664 * If no more data, we've reached end of run on this tape.
1665 * Remove the top node from the heap.
1666 */
1668 state->nInputRuns--;
1669
1670 /*
1671 * Close the tape. It'd go away at the end of the sort
1672 * anyway, but better to release the memory early.
1673 */
1674 LogicalTapeClose(srcTape);
1675 return true;
1676 }
1677 newtup.srctape = srcTapeIndex;
1679 return true;
1680 }
1681 return false;
1682
1683 default:
1684 elog(ERROR, "invalid tuplesort state");
1685 return false; /* keep compiler quiet */
1686 }
1687}
1688
1689
1690/*
1691 * Advance over N tuples in either forward or back direction,
1692 * without returning any data. N==0 is a no-op.
1693 * Returns true if successful, false if ran out of tuples.
1694 */
1695bool
1697{
1698 MemoryContext oldcontext;
1699
1700 /*
1701 * We don't actually support backwards skip yet, because no callers need
1702 * it. The API is designed to allow for that later, though.
1703 */
1704 Assert(forward);
1705 Assert(ntuples >= 0);
1706 Assert(!WORKER(state));
1707
1708 switch (state->status)
1709 {
1710 case TSS_SORTEDINMEM:
1711 if (state->memtupcount - state->current >= ntuples)
1712 {
1713 state->current += ntuples;
1714 return true;
1715 }
1716 state->current = state->memtupcount;
1717 state->eof_reached = true;
1718
1719 /*
1720 * Complain if caller tries to retrieve more tuples than
1721 * originally asked for in a bounded sort. This is because
1722 * returning EOF here might be the wrong thing.
1723 */
1724 if (state->bounded && state->current >= state->bound)
1725 elog(ERROR, "retrieved too many tuples in a bounded sort");
1726
1727 return false;
1728
1729 case TSS_SORTEDONTAPE:
1730 case TSS_FINALMERGE:
1731
1732 /*
1733 * We could probably optimize these cases better, but for now it's
1734 * not worth the trouble.
1735 */
1736 oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1737 while (ntuples-- > 0)
1738 {
1739 SortTuple stup;
1740
1741 if (!tuplesort_gettuple_common(state, forward, &stup))
1742 {
1743 MemoryContextSwitchTo(oldcontext);
1744 return false;
1745 }
1747 }
1748 MemoryContextSwitchTo(oldcontext);
1749 return true;
1750
1751 default:
1752 elog(ERROR, "invalid tuplesort state");
1753 return false; /* keep compiler quiet */
1754 }
1755}
1756
1757/*
1758 * tuplesort_merge_order - report merge order we'll use for given memory
1759 * (note: "merge order" just means the number of input tapes in the merge).
1760 *
1761 * This is exported for use by the planner. allowedMem is in bytes.
1762 */
1763int
1765{
1766 int mOrder;
1767
1768 /*----------
1769 * In the merge phase, we need buffer space for each input and output tape.
1770 * Each pass in the balanced merge algorithm reads from M input tapes, and
1771 * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1772 * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1773 * input tape.
1774 *
1775 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1776 * N * TAPE_BUFFER_OVERHEAD
1777 *
1778 * Except for the last and next-to-last merge passes, where there can be
1779 * fewer tapes left to process, M = N. We choose M so that we have the
1780 * desired amount of memory available for the input buffers
1781 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1782 * available for the tape buffers (allowedMem).
1783 *
1784 * Note: you might be thinking we need to account for the memtuples[]
1785 * array in this calculation, but we effectively treat that as part of the
1786 * MERGE_BUFFER_SIZE workspace.
1787 *----------
1788 */
1789 mOrder = allowedMem /
1791
1792 /*
1793 * Even in minimum memory, use at least a MINORDER merge. On the other
1794 * hand, even when we have lots of memory, do not use more than a MAXORDER
1795 * merge. Tapes are pretty cheap, but they're not entirely free. Each
1796 * additional tape reduces the amount of memory available to build runs,
1797 * which in turn can cause the same sort to need more runs, which makes
1798 * merging slower even if it can still be done in a single pass. Also,
1799 * high order merges are quite slow due to CPU cache effects; it can be
1800 * faster to pay the I/O cost of a multi-pass merge than to perform a
1801 * single merge pass across many hundreds of tapes.
1802 */
1803 mOrder = Max(mOrder, MINORDER);
1804 mOrder = Min(mOrder, MAXORDER);
1805
1806 return mOrder;
1807}
1808
1809/*
1810 * Helper function to calculate how much memory to allocate for the read buffer
1811 * of each input tape in a merge pass.
1812 *
1813 * 'avail_mem' is the amount of memory available for the buffers of all the
1814 * tapes, both input and output.
1815 * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1816 * 'maxOutputTapes' is the max. number of output tapes we should produce.
1817 */
1818static int64
1819merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1820 int maxOutputTapes)
1821{
1822 int nOutputRuns;
1823 int nOutputTapes;
1824
1825 /*
1826 * How many output tapes will we produce in this pass?
1827 *
1828 * This is nInputRuns / nInputTapes, rounded up.
1829 */
1830 nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1831
1832 nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1833
1834 /*
1835 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1836 * remaining memory is divided evenly between the input tapes.
1837 *
1838 * This also follows from the formula in tuplesort_merge_order, but here
1839 * we derive the input buffer size from the amount of memory available,
1840 * and M and N.
1841 */
1842 return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1843}
1844
1845/*
1846 * inittapes - initialize for tape sorting.
1847 *
1848 * This is called only if we have found we won't sort in memory.
1849 */
1850static void
1852{
1853 Assert(!LEADER(state));
1854
1855 if (mergeruns)
1856 {
1857 /* Compute number of input tapes to use when merging */
1858 state->maxTapes = tuplesort_merge_order(state->allowedMem);
1859 }
1860 else
1861 {
1862 /* Workers can sometimes produce single run, output without merge */
1864 state->maxTapes = MINORDER;
1865 }
1866
1867 if (trace_sort)
1868 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1869 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1870
1871 /* Create the tape set */
1872 inittapestate(state, state->maxTapes);
1873 state->tapeset =
1875 state->shared ? &state->shared->fileset : NULL,
1876 state->worker);
1877
1878 state->currentRun = 0;
1879
1880 /*
1881 * Initialize logical tape arrays.
1882 */
1883 state->inputTapes = NULL;
1884 state->nInputTapes = 0;
1885 state->nInputRuns = 0;
1886
1887 state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1888 state->nOutputTapes = 0;
1889 state->nOutputRuns = 0;
1890
1891 state->status = TSS_BUILDRUNS;
1892
1894}
1895
1896/*
1897 * inittapestate - initialize generic tape management state
1898 */
1899static void
1901{
1902 int64 tapeSpace;
1903
1904 /*
1905 * Decrease availMem to reflect the space needed for tape buffers; but
1906 * don't decrease it to the point that we have no room for tuples. (That
1907 * case is only likely to occur if sorting pass-by-value Datums; in all
1908 * other scenarios the memtuples[] array is unlikely to occupy more than
1909 * half of allowedMem. In the pass-by-value case it's not important to
1910 * account for tuple space, so we don't care if LACKMEM becomes
1911 * inaccurate.)
1912 */
1913 tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1914
1915 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1916 USEMEM(state, tapeSpace);
1917
1918 /*
1919 * Make sure that the temp file(s) underlying the tape set are created in
1920 * suitable temp tablespaces. For parallel sorts, this should have been
1921 * called already, but it doesn't matter if it is called a second time.
1922 */
1924}
1925
1926/*
1927 * selectnewtape -- select next tape to output to.
1928 *
1929 * This is called after finishing a run when we know another run
1930 * must be started. This is used both when building the initial
1931 * runs, and during merge passes.
1932 */
1933static void
1935{
1936 /*
1937 * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1938 * both zero. On each call, we create a new output tape to hold the next
1939 * run, until maxTapes is reached. After that, we assign new runs to the
1940 * existing tapes in a round robin fashion.
1941 */
1942 if (state->nOutputTapes < state->maxTapes)
1943 {
1944 /* Create a new tape to hold the next run */
1945 Assert(state->outputTapes[state->nOutputRuns] == NULL);
1946 Assert(state->nOutputRuns == state->nOutputTapes);
1947 state->destTape = LogicalTapeCreate(state->tapeset);
1948 state->outputTapes[state->nOutputTapes] = state->destTape;
1949 state->nOutputTapes++;
1950 state->nOutputRuns++;
1951 }
1952 else
1953 {
1954 /*
1955 * We have reached the max number of tapes. Append to an existing
1956 * tape.
1957 */
1958 state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1959 state->nOutputRuns++;
1960 }
1961}
1962
1963/*
1964 * Initialize the slab allocation arena, for the given number of slots.
1965 */
1966static void
1968{
1969 if (numSlots > 0)
1970 {
1971 char *p;
1972 int i;
1973
1974 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
1975 state->slabMemoryEnd = state->slabMemoryBegin +
1976 numSlots * SLAB_SLOT_SIZE;
1977 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
1978 USEMEM(state, numSlots * SLAB_SLOT_SIZE);
1979
1980 p = state->slabMemoryBegin;
1981 for (i = 0; i < numSlots - 1; i++)
1982 {
1983 ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
1984 p += SLAB_SLOT_SIZE;
1985 }
1986 ((SlabSlot *) p)->nextfree = NULL;
1987 }
1988 else
1989 {
1990 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
1991 state->slabFreeHead = NULL;
1992 }
1993 state->slabAllocatorUsed = true;
1994}
1995
1996/*
1997 * mergeruns -- merge all the completed initial runs.
1998 *
1999 * This implements the Balanced k-Way Merge Algorithm. All input data has
2000 * already been written to initial runs on tape (see dumptuples).
2001 */
2002static void
2004{
2005 int tapenum;
2006
2007 Assert(state->status == TSS_BUILDRUNS);
2008 Assert(state->memtupcount == 0);
2009
2010 if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2011 {
2012 /*
2013 * If there are multiple runs to be merged, when we go to read back
2014 * tuples from disk, abbreviated keys will not have been stored, and
2015 * we don't care to regenerate them. Disable abbreviation from this
2016 * point on.
2017 */
2018 state->base.sortKeys->abbrev_converter = NULL;
2019 state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2020
2021 /* Not strictly necessary, but be tidy */
2022 state->base.sortKeys->abbrev_abort = NULL;
2023 state->base.sortKeys->abbrev_full_comparator = NULL;
2024 }
2025
2026 /*
2027 * Reset tuple memory. We've freed all the tuples that we previously
2028 * allocated. We will use the slab allocator from now on.
2029 */
2030 MemoryContextResetOnly(state->base.tuplecontext);
2031
2032 /*
2033 * We no longer need a large memtuples array. (We will allocate a smaller
2034 * one for the heap later.)
2035 */
2036 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2037 pfree(state->memtuples);
2038 state->memtuples = NULL;
2039
2040 /*
2041 * Initialize the slab allocator. We need one slab slot per input tape,
2042 * for the tuples in the heap, plus one to hold the tuple last returned
2043 * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2044 * however, we don't need to do allocate anything.)
2045 *
2046 * In a multi-pass merge, we could shrink this allocation for the last
2047 * merge pass, if it has fewer tapes than previous passes, but we don't
2048 * bother.
2049 *
2050 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2051 * to track memory usage of individual tuples.
2052 */
2053 if (state->base.tuples)
2054 init_slab_allocator(state, state->nOutputTapes + 1);
2055 else
2057
2058 /*
2059 * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2060 * from each input tape.
2061 *
2062 * We could shrink this, too, between passes in a multi-pass merge, but we
2063 * don't bother. (The initial input tapes are still in outputTapes. The
2064 * number of input tapes will not increase between passes.)
2065 */
2066 state->memtupsize = state->nOutputTapes;
2067 state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2068 state->nOutputTapes * sizeof(SortTuple));
2069 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2070
2071 /*
2072 * Use all the remaining memory we have available for tape buffers among
2073 * all the input tapes. At the beginning of each merge pass, we will
2074 * divide this memory between the input and output tapes in the pass.
2075 */
2076 state->tape_buffer_mem = state->availMem;
2077 USEMEM(state, state->tape_buffer_mem);
2078 if (trace_sort)
2079 elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2080 state->worker, state->tape_buffer_mem / 1024);
2081
2082 for (;;)
2083 {
2084 /*
2085 * On the first iteration, or if we have read all the runs from the
2086 * input tapes in a multi-pass merge, it's time to start a new pass.
2087 * Rewind all the output tapes, and make them inputs for the next
2088 * pass.
2089 */
2090 if (state->nInputRuns == 0)
2091 {
2092 int64 input_buffer_size;
2093
2094 /* Close the old, emptied, input tapes */
2095 if (state->nInputTapes > 0)
2096 {
2097 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2098 LogicalTapeClose(state->inputTapes[tapenum]);
2099 pfree(state->inputTapes);
2100 }
2101
2102 /* Previous pass's outputs become next pass's inputs. */
2103 state->inputTapes = state->outputTapes;
2104 state->nInputTapes = state->nOutputTapes;
2105 state->nInputRuns = state->nOutputRuns;
2106
2107 /*
2108 * Reset output tape variables. The actual LogicalTapes will be
2109 * created as needed, here we only allocate the array to hold
2110 * them.
2111 */
2112 state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2113 state->nOutputTapes = 0;
2114 state->nOutputRuns = 0;
2115
2116 /*
2117 * Redistribute the memory allocated for tape buffers, among the
2118 * new input and output tapes.
2119 */
2120 input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2121 state->nInputTapes,
2122 state->nInputRuns,
2123 state->maxTapes);
2124
2125 if (trace_sort)
2126 elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2127 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2128 pg_rusage_show(&state->ru_start));
2129
2130 /* Prepare the new input tapes for merge pass. */
2131 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2132 LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2133
2134 /*
2135 * If there's just one run left on each input tape, then only one
2136 * merge pass remains. If we don't have to produce a materialized
2137 * sorted tape, we can stop at this point and do the final merge
2138 * on-the-fly.
2139 */
2140 if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2141 && state->nInputRuns <= state->nInputTapes
2142 && !WORKER(state))
2143 {
2144 /* Tell logtape.c we won't be writing anymore */
2146 /* Initialize for the final merge pass */
2148 state->status = TSS_FINALMERGE;
2149 return;
2150 }
2151 }
2152
2153 /* Select an output tape */
2155
2156 /* Merge one run from each input tape. */
2158
2159 /*
2160 * If the input tapes are empty, and we output only one output run,
2161 * we're done. The current output tape contains the final result.
2162 */
2163 if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2164 break;
2165 }
2166
2167 /*
2168 * Done. The result is on a single run on a single tape.
2169 */
2170 state->result_tape = state->outputTapes[0];
2171 if (!WORKER(state))
2172 LogicalTapeFreeze(state->result_tape, NULL);
2173 else
2175 state->status = TSS_SORTEDONTAPE;
2176
2177 /* Close all the now-empty input tapes, to release their read buffers. */
2178 for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2179 LogicalTapeClose(state->inputTapes[tapenum]);
2180}
2181
2182/*
2183 * Merge one run from each input tape.
2184 */
2185static void
2187{
2188 int srcTapeIndex;
2189 LogicalTape *srcTape;
2190
2191 /*
2192 * Start the merge by loading one tuple from each active source tape into
2193 * the heap.
2194 */
2196
2197 Assert(state->slabAllocatorUsed);
2198
2199 /*
2200 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2201 * out, and replacing it with next tuple from same tape (if there is
2202 * another one).
2203 */
2204 while (state->memtupcount > 0)
2205 {
2206 SortTuple stup;
2207
2208 /* write the tuple to destTape */
2209 srcTapeIndex = state->memtuples[0].srctape;
2210 srcTape = state->inputTapes[srcTapeIndex];
2211 WRITETUP(state, state->destTape, &state->memtuples[0]);
2212
2213 /* recycle the slot of the tuple we just wrote out, for the next read */
2214 if (state->memtuples[0].tuple)
2215 RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2216
2217 /*
2218 * pull next tuple from the tape, and replace the written-out tuple in
2219 * the heap with it.
2220 */
2221 if (mergereadnext(state, srcTape, &stup))
2222 {
2223 stup.srctape = srcTapeIndex;
2225 }
2226 else
2227 {
2229 state->nInputRuns--;
2230 }
2231 }
2232
2233 /*
2234 * When the heap empties, we're done. Write an end-of-run marker on the
2235 * output tape.
2236 */
2237 markrunend(state->destTape);
2238}
2239
2240/*
2241 * beginmerge - initialize for a merge pass
2242 *
2243 * Fill the merge heap with the first tuple from each input tape.
2244 */
2245static void
2247{
2248 int activeTapes;
2249 int srcTapeIndex;
2250
2251 /* Heap should be empty here */
2252 Assert(state->memtupcount == 0);
2253
2254 activeTapes = Min(state->nInputTapes, state->nInputRuns);
2255
2256 for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2257 {
2258 SortTuple tup;
2259
2260 if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2261 {
2262 tup.srctape = srcTapeIndex;
2264 }
2265 }
2266}
2267
2268/*
2269 * mergereadnext - read next tuple from one merge input tape
2270 *
2271 * Returns false on EOF.
2272 */
2273static bool
2275{
2276 unsigned int tuplen;
2277
2278 /* read next tuple, if any */
2279 if ((tuplen = getlen(srcTape, true)) == 0)
2280 return false;
2281 READTUP(state, stup, srcTape, tuplen);
2282
2283 return true;
2284}
2285
2286/*
2287 * dumptuples - remove tuples from memtuples and write initial run to tape
2288 *
2289 * When alltuples = true, dump everything currently in memory. (This case is
2290 * only used at end of input data.)
2291 */
2292static void
2294{
2295 int memtupwrite;
2296 int i;
2297
2298 /*
2299 * Nothing to do if we still fit in available memory and have array slots,
2300 * unless this is the final call during initial run generation.
2301 */
2302 if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2303 !alltuples)
2304 return;
2305
2306 /*
2307 * Final call might require no sorting, in rare cases where we just so
2308 * happen to have previously LACKMEM()'d at the point where exactly all
2309 * remaining tuples are loaded into memory, just before input was
2310 * exhausted. In general, short final runs are quite possible, but avoid
2311 * creating a completely empty run. In a worker, though, we must produce
2312 * at least one tape, even if it's empty.
2313 */
2314 if (state->memtupcount == 0 && state->currentRun > 0)
2315 return;
2316
2317 Assert(state->status == TSS_BUILDRUNS);
2318
2319 /*
2320 * It seems unlikely that this limit will ever be exceeded, but take no
2321 * chances
2322 */
2323 if (state->currentRun == INT_MAX)
2324 ereport(ERROR,
2325 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2326 errmsg("cannot have more than %d runs for an external sort",
2327 INT_MAX)));
2328
2329 if (state->currentRun > 0)
2331
2332 state->currentRun++;
2333
2334 if (trace_sort)
2335 elog(LOG, "worker %d starting quicksort of run %d: %s",
2336 state->worker, state->currentRun,
2337 pg_rusage_show(&state->ru_start));
2338
2339 /*
2340 * Sort all tuples accumulated within the allowed amount of memory for
2341 * this run using quicksort
2342 */
2344
2345 if (trace_sort)
2346 elog(LOG, "worker %d finished quicksort of run %d: %s",
2347 state->worker, state->currentRun,
2348 pg_rusage_show(&state->ru_start));
2349
2350 memtupwrite = state->memtupcount;
2351 for (i = 0; i < memtupwrite; i++)
2352 {
2353 SortTuple *stup = &state->memtuples[i];
2354
2355 WRITETUP(state, state->destTape, stup);
2356 }
2357
2358 state->memtupcount = 0;
2359
2360 /*
2361 * Reset tuple memory. We've freed all of the tuples that we previously
2362 * allocated. It's important to avoid fragmentation when there is a stark
2363 * change in the sizes of incoming tuples. In bounded sorts,
2364 * fragmentation due to AllocSetFree's bucketing by size class might be
2365 * particularly bad if this step wasn't taken.
2366 */
2367 MemoryContextReset(state->base.tuplecontext);
2368
2369 /*
2370 * Now update the memory accounting to subtract the memory used by the
2371 * tuple.
2372 */
2373 FREEMEM(state, state->tupleMem);
2374 state->tupleMem = 0;
2375
2376 markrunend(state->destTape);
2377
2378 if (trace_sort)
2379 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2380 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2381 pg_rusage_show(&state->ru_start));
2382}
2383
2384/*
2385 * tuplesort_rescan - rewind and replay the scan
2386 */
2387void
2389{
2390 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2391
2392 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2393
2394 switch (state->status)
2395 {
2396 case TSS_SORTEDINMEM:
2397 state->current = 0;
2398 state->eof_reached = false;
2399 state->markpos_offset = 0;
2400 state->markpos_eof = false;
2401 break;
2402 case TSS_SORTEDONTAPE:
2403 LogicalTapeRewindForRead(state->result_tape, 0);
2404 state->eof_reached = false;
2405 state->markpos_block = 0L;
2406 state->markpos_offset = 0;
2407 state->markpos_eof = false;
2408 break;
2409 default:
2410 elog(ERROR, "invalid tuplesort state");
2411 break;
2412 }
2413
2414 MemoryContextSwitchTo(oldcontext);
2415}
2416
2417/*
2418 * tuplesort_markpos - saves current position in the merged sort file
2419 */
2420void
2422{
2423 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2424
2425 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2426
2427 switch (state->status)
2428 {
2429 case TSS_SORTEDINMEM:
2430 state->markpos_offset = state->current;
2431 state->markpos_eof = state->eof_reached;
2432 break;
2433 case TSS_SORTEDONTAPE:
2434 LogicalTapeTell(state->result_tape,
2435 &state->markpos_block,
2436 &state->markpos_offset);
2437 state->markpos_eof = state->eof_reached;
2438 break;
2439 default:
2440 elog(ERROR, "invalid tuplesort state");
2441 break;
2442 }
2443
2444 MemoryContextSwitchTo(oldcontext);
2445}
2446
2447/*
2448 * tuplesort_restorepos - restores current position in merged sort file to
2449 * last saved position
2450 */
2451void
2453{
2454 MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2455
2456 Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2457
2458 switch (state->status)
2459 {
2460 case TSS_SORTEDINMEM:
2461 state->current = state->markpos_offset;
2462 state->eof_reached = state->markpos_eof;
2463 break;
2464 case TSS_SORTEDONTAPE:
2465 LogicalTapeSeek(state->result_tape,
2466 state->markpos_block,
2467 state->markpos_offset);
2468 state->eof_reached = state->markpos_eof;
2469 break;
2470 default:
2471 elog(ERROR, "invalid tuplesort state");
2472 break;
2473 }
2474
2475 MemoryContextSwitchTo(oldcontext);
2476}
2477
2478/*
2479 * tuplesort_get_stats - extract summary statistics
2480 *
2481 * This can be called after tuplesort_performsort() finishes to obtain
2482 * printable summary information about how the sort was performed.
2483 */
2484void
2487{
2488 /*
2489 * Note: it might seem we should provide both memory and disk usage for a
2490 * disk-based sort. However, the current code doesn't track memory space
2491 * accurately once we have begun to return tuples to the caller (since we
2492 * don't account for pfree's the caller is expected to do), so we cannot
2493 * rely on availMem in a disk sort. This does not seem worth the overhead
2494 * to fix. Is it worth creating an API for the memory context code to
2495 * tell us how much is actually used in sortcontext?
2496 */
2498
2499 if (state->isMaxSpaceDisk)
2501 else
2503 stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2504
2505 switch (state->maxSpaceStatus)
2506 {
2507 case TSS_SORTEDINMEM:
2508 if (state->boundUsed)
2510 else
2512 break;
2513 case TSS_SORTEDONTAPE:
2515 break;
2516 case TSS_FINALMERGE:
2518 break;
2519 default:
2521 break;
2522 }
2523}
2524
2525/*
2526 * Convert TuplesortMethod to a string.
2527 */
2528const char *
2530{
2531 switch (m)
2532 {
2534 return "still in progress";
2536 return "top-N heapsort";
2538 return "quicksort";
2540 return "external sort";
2542 return "external merge";
2543 }
2544
2545 return "unknown";
2546}
2547
2548/*
2549 * Convert TuplesortSpaceType to a string.
2550 */
2551const char *
2553{
2555 return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2556}
2557
2558
2559/*
2560 * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2561 */
2562
2563/*
2564 * Convert the existing unordered array of SortTuples to a bounded heap,
2565 * discarding all but the smallest "state->bound" tuples.
2566 *
2567 * When working with a bounded heap, we want to keep the largest entry
2568 * at the root (array entry zero), instead of the smallest as in the normal
2569 * sort case. This allows us to discard the largest entry cheaply.
2570 * Therefore, we temporarily reverse the sort direction.
2571 */
2572static void
2574{
2575 int tupcount = state->memtupcount;
2576 int i;
2577
2578 Assert(state->status == TSS_INITIAL);
2579 Assert(state->bounded);
2580 Assert(tupcount >= state->bound);
2582
2583 /* Reverse sort direction so largest entry will be at root */
2585
2586 state->memtupcount = 0; /* make the heap empty */
2587 for (i = 0; i < tupcount; i++)
2588 {
2589 if (state->memtupcount < state->bound)
2590 {
2591 /* Insert next tuple into heap */
2592 /* Must copy source tuple to avoid possible overwrite */
2593 SortTuple stup = state->memtuples[i];
2594
2596 }
2597 else
2598 {
2599 /*
2600 * The heap is full. Replace the largest entry with the new
2601 * tuple, or just discard it, if it's larger than anything already
2602 * in the heap.
2603 */
2604 if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2605 {
2606 free_sort_tuple(state, &state->memtuples[i]);
2608 }
2609 else
2610 tuplesort_heap_replace_top(state, &state->memtuples[i]);
2611 }
2612 }
2613
2614 Assert(state->memtupcount == state->bound);
2615 state->status = TSS_BOUNDED;
2616}
2617
2618/*
2619 * Convert the bounded heap to a properly-sorted array
2620 */
2621static void
2623{
2624 int tupcount = state->memtupcount;
2625
2626 Assert(state->status == TSS_BOUNDED);
2627 Assert(state->bounded);
2628 Assert(tupcount == state->bound);
2630
2631 /*
2632 * We can unheapify in place because each delete-top call will remove the
2633 * largest entry, which we can promptly store in the newly freed slot at
2634 * the end. Once we're down to a single-entry heap, we're done.
2635 */
2636 while (state->memtupcount > 1)
2637 {
2638 SortTuple stup = state->memtuples[0];
2639
2640 /* this sifts-up the next-largest entry and decreases memtupcount */
2642 state->memtuples[state->memtupcount] = stup;
2643 }
2644 state->memtupcount = tupcount;
2645
2646 /*
2647 * Reverse sort direction back to the original state. This is not
2648 * actually necessary but seems like a good idea for tidiness.
2649 */
2651
2652 state->status = TSS_SORTEDINMEM;
2653 state->boundUsed = true;
2654}
2655
2656/*
2657 * Sort all memtuples using specialized qsort() routines.
2658 *
2659 * Quicksort is used for small in-memory sorts, and external sort runs.
2660 */
2661static void
2663{
2664 Assert(!LEADER(state));
2665
2666 if (state->memtupcount > 1)
2667 {
2668 /*
2669 * Do we have the leading column's value or abbreviation in datum1,
2670 * and is there a specialization for its comparator?
2671 */
2672 if (state->base.haveDatum1 && state->base.sortKeys)
2673 {
2674 if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2675 {
2676 qsort_tuple_unsigned(state->memtuples,
2677 state->memtupcount,
2678 state);
2679 return;
2680 }
2681 else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2682 {
2683 qsort_tuple_signed(state->memtuples,
2684 state->memtupcount,
2685 state);
2686 return;
2687 }
2688 else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2689 {
2690 qsort_tuple_int32(state->memtuples,
2691 state->memtupcount,
2692 state);
2693 return;
2694 }
2695 }
2696
2697 /* Can we use the single-key sort function? */
2698 if (state->base.onlyKey != NULL)
2699 {
2700 qsort_ssup(state->memtuples, state->memtupcount,
2701 state->base.onlyKey);
2702 }
2703 else
2704 {
2705 qsort_tuple(state->memtuples,
2706 state->memtupcount,
2707 state->base.comparetup,
2708 state);
2709 }
2710 }
2711}
2712
2713/*
2714 * Insert a new tuple into an empty or existing heap, maintaining the
2715 * heap invariant. Caller is responsible for ensuring there's room.
2716 *
2717 * Note: For some callers, tuple points to a memtuples[] entry above the
2718 * end of the heap. This is safe as long as it's not immediately adjacent
2719 * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2720 * is, it might get overwritten before being moved into the heap!
2721 */
2722static void
2724{
2725 SortTuple *memtuples;
2726 int j;
2727
2728 memtuples = state->memtuples;
2729 Assert(state->memtupcount < state->memtupsize);
2730
2732
2733 /*
2734 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2735 * using 1-based array indexes, not 0-based.
2736 */
2737 j = state->memtupcount++;
2738 while (j > 0)
2739 {
2740 int i = (j - 1) >> 1;
2741
2742 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2743 break;
2744 memtuples[j] = memtuples[i];
2745 j = i;
2746 }
2747 memtuples[j] = *tuple;
2748}
2749
2750/*
2751 * Remove the tuple at state->memtuples[0] from the heap. Decrement
2752 * memtupcount, and sift up to maintain the heap invariant.
2753 *
2754 * The caller has already free'd the tuple the top node points to,
2755 * if necessary.
2756 */
2757static void
2759{
2760 SortTuple *memtuples = state->memtuples;
2761 SortTuple *tuple;
2762
2763 if (--state->memtupcount <= 0)
2764 return;
2765
2766 /*
2767 * Remove the last tuple in the heap, and re-insert it, by replacing the
2768 * current top node with it.
2769 */
2770 tuple = &memtuples[state->memtupcount];
2772}
2773
2774/*
2775 * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2776 * maintain the heap invariant.
2777 *
2778 * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2779 * Heapsort, steps H3-H8).
2780 */
2781static void
2783{
2784 SortTuple *memtuples = state->memtuples;
2785 unsigned int i,
2786 n;
2787
2788 Assert(state->memtupcount >= 1);
2789
2791
2792 /*
2793 * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2794 * This prevents overflow in the "2 * i + 1" calculation, since at the top
2795 * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2796 */
2797 n = state->memtupcount;
2798 i = 0; /* i is where the "hole" is */
2799 for (;;)
2800 {
2801 unsigned int j = 2 * i + 1;
2802
2803 if (j >= n)
2804 break;
2805 if (j + 1 < n &&
2806 COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2807 j++;
2808 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2809 break;
2810 memtuples[i] = memtuples[j];
2811 i = j;
2812 }
2813 memtuples[i] = *tuple;
2814}
2815
2816/*
2817 * Function to reverse the sort direction from its current state
2818 *
2819 * It is not safe to call this when performing hash tuplesorts
2820 */
2821static void
2823{
2824 SortSupport sortKey = state->base.sortKeys;
2825 int nkey;
2826
2827 for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2828 {
2829 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2830 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2831 }
2832}
2833
2834
2835/*
2836 * Tape interface routines
2837 */
2838
2839static unsigned int
2840getlen(LogicalTape *tape, bool eofOK)
2841{
2842 unsigned int len;
2843
2844 if (LogicalTapeRead(tape,
2845 &len, sizeof(len)) != sizeof(len))
2846 elog(ERROR, "unexpected end of tape");
2847 if (len == 0 && !eofOK)
2848 elog(ERROR, "unexpected end of data");
2849 return len;
2850}
2851
2852static void
2854{
2855 unsigned int len = 0;
2856
2857 LogicalTapeWrite(tape, &len, sizeof(len));
2858}
2859
2860/*
2861 * Get memory for tuple from within READTUP() routine.
2862 *
2863 * We use next free slot from the slab allocator, or palloc() if the tuple
2864 * is too large for that.
2865 */
2866void *
2868{
2869 SlabSlot *buf;
2870
2871 /*
2872 * We pre-allocate enough slots in the slab arena that we should never run
2873 * out.
2874 */
2875 Assert(state->slabFreeHead);
2876
2877 if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2878 return MemoryContextAlloc(state->base.sortcontext, tuplen);
2879 else
2880 {
2881 buf = state->slabFreeHead;
2882 /* Reuse this slot */
2883 state->slabFreeHead = buf->nextfree;
2884
2885 return buf;
2886 }
2887}
2888
2889
2890/*
2891 * Parallel sort routines
2892 */
2893
2894/*
2895 * tuplesort_estimate_shared - estimate required shared memory allocation
2896 *
2897 * nWorkers is an estimate of the number of workers (it's the number that
2898 * will be requested).
2899 */
2900Size
2902{
2903 Size tapesSize;
2904
2905 Assert(nWorkers > 0);
2906
2907 /* Make sure that BufFile shared state is MAXALIGN'd */
2908 tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2909 tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2910
2911 return tapesSize;
2912}
2913
2914/*
2915 * tuplesort_initialize_shared - initialize shared tuplesort state
2916 *
2917 * Must be called from leader process before workers are launched, to
2918 * establish state needed up-front for worker tuplesortstates. nWorkers
2919 * should match the argument passed to tuplesort_estimate_shared().
2920 */
2921void
2923{
2924 int i;
2925
2926 Assert(nWorkers > 0);
2927
2928 SpinLockInit(&shared->mutex);
2929 shared->currentWorker = 0;
2930 shared->workersFinished = 0;
2931 SharedFileSetInit(&shared->fileset, seg);
2932 shared->nTapes = nWorkers;
2933 for (i = 0; i < nWorkers; i++)
2934 {
2935 shared->tapes[i].firstblocknumber = 0L;
2936 }
2937}
2938
2939/*
2940 * tuplesort_attach_shared - attach to shared tuplesort state
2941 *
2942 * Must be called by all worker processes.
2943 */
2944void
2946{
2947 /* Attach to SharedFileSet */
2948 SharedFileSetAttach(&shared->fileset, seg);
2949}
2950
2951/*
2952 * worker_get_identifier - Assign and return ordinal identifier for worker
2953 *
2954 * The order in which these are assigned is not well defined, and should not
2955 * matter; worker numbers across parallel sort participants need only be
2956 * distinct and gapless. logtape.c requires this.
2957 *
2958 * Note that the identifiers assigned from here have no relation to
2959 * ParallelWorkerNumber number, to avoid making any assumption about
2960 * caller's requirements. However, we do follow the ParallelWorkerNumber
2961 * convention of representing a non-worker with worker number -1. This
2962 * includes the leader, as well as serial Tuplesort processes.
2963 */
2964static int
2966{
2967 Sharedsort *shared = state->shared;
2968 int worker;
2969
2971
2972 SpinLockAcquire(&shared->mutex);
2973 worker = shared->currentWorker++;
2974 SpinLockRelease(&shared->mutex);
2975
2976 return worker;
2977}
2978
2979/*
2980 * worker_freeze_result_tape - freeze worker's result tape for leader
2981 *
2982 * This is called by workers just after the result tape has been determined,
2983 * instead of calling LogicalTapeFreeze() directly. They do so because
2984 * workers require a few additional steps over similar serial
2985 * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
2986 * steps are around freeing now unneeded resources, and representing to
2987 * leader that worker's input run is available for its merge.
2988 *
2989 * There should only be one final output run for each worker, which consists
2990 * of all tuples that were originally input into worker.
2991 */
2992static void
2994{
2995 Sharedsort *shared = state->shared;
2997
2999 Assert(state->result_tape != NULL);
3000 Assert(state->memtupcount == 0);
3001
3002 /*
3003 * Free most remaining memory, in case caller is sensitive to our holding
3004 * on to it. memtuples may not be a tiny merge heap at this point.
3005 */
3006 pfree(state->memtuples);
3007 /* Be tidy */
3008 state->memtuples = NULL;
3009 state->memtupsize = 0;
3010
3011 /*
3012 * Parallel worker requires result tape metadata, which is to be stored in
3013 * shared memory for leader
3014 */
3015 LogicalTapeFreeze(state->result_tape, &output);
3016
3017 /* Store properties of output tape, and update finished worker count */
3018 SpinLockAcquire(&shared->mutex);
3019 shared->tapes[state->worker] = output;
3020 shared->workersFinished++;
3021 SpinLockRelease(&shared->mutex);
3022}
3023
3024/*
3025 * worker_nomergeruns - dump memtuples in worker, without merging
3026 *
3027 * This called as an alternative to mergeruns() with a worker when no
3028 * merging is required.
3029 */
3030static void
3032{
3034 Assert(state->result_tape == NULL);
3035 Assert(state->nOutputRuns == 1);
3036
3037 state->result_tape = state->destTape;
3039}
3040
3041/*
3042 * leader_takeover_tapes - create tapeset for leader from worker tapes
3043 *
3044 * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3045 * sorting has occurred in workers, all of which must have already returned
3046 * from tuplesort_performsort().
3047 *
3048 * When this returns, leader process is left in a state that is virtually
3049 * indistinguishable from it having generated runs as a serial external sort
3050 * might have.
3051 */
3052static void
3054{
3055 Sharedsort *shared = state->shared;
3056 int nParticipants = state->nParticipants;
3057 int workersFinished;
3058 int j;
3059
3061 Assert(nParticipants >= 1);
3062
3063 SpinLockAcquire(&shared->mutex);
3064 workersFinished = shared->workersFinished;
3065 SpinLockRelease(&shared->mutex);
3066
3067 if (nParticipants != workersFinished)
3068 elog(ERROR, "cannot take over tapes before all workers finish");
3069
3070 /*
3071 * Create the tapeset from worker tapes, including a leader-owned tape at
3072 * the end. Parallel workers are far more expensive than logical tapes,
3073 * so the number of tapes allocated here should never be excessive.
3074 */
3075 inittapestate(state, nParticipants);
3076 state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3077
3078 /*
3079 * Set currentRun to reflect the number of runs we will merge (it's not
3080 * used for anything, this is just pro forma)
3081 */
3082 state->currentRun = nParticipants;
3083
3084 /*
3085 * Initialize the state to look the same as after building the initial
3086 * runs.
3087 *
3088 * There will always be exactly 1 run per worker, and exactly one input
3089 * tape per run, because workers always output exactly 1 run, even when
3090 * there were no input tuples for workers to sort.
3091 */
3092 state->inputTapes = NULL;
3093 state->nInputTapes = 0;
3094 state->nInputRuns = 0;
3095
3096 state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3097 state->nOutputTapes = nParticipants;
3098 state->nOutputRuns = nParticipants;
3099
3100 for (j = 0; j < nParticipants; j++)
3101 {
3102 state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3103 }
3104
3105 state->status = TSS_BUILDRUNS;
3106}
3107
3108/*
3109 * Convenience routine to free a tuple previously loaded into sort memory
3110 */
3111static void
3113{
3114 if (stup->tuple)
3115 {
3117 pfree(stup->tuple);
3118 stup->tuple = NULL;
3119 }
3120}
3121
3122int
3124{
3125 if (x < y)
3126 return -1;
3127 else if (x > y)
3128 return 1;
3129 else
3130 return 0;
3131}
3132
3133int
3135{
3136 int64 xx = DatumGetInt64(x);
3137 int64 yy = DatumGetInt64(y);
3138
3139 if (xx < yy)
3140 return -1;
3141 else if (xx > yy)
3142 return 1;
3143 else
3144 return 0;
3145}
3146
3147int
3149{
3150 int32 xx = DatumGetInt32(x);
3151 int32 yy = DatumGetInt32(y);
3152
3153 if (xx < yy)
3154 return -1;
3155 else if (xx > yy)
3156 return 1;
3157 else
3158 return 0;
3159}
void PrepareTempTablespaces(void)
Definition: tablespace.c:1331
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: bump.c:133
#define Min(x, y)
Definition: c.h:1016
#define MAXALIGN(LEN)
Definition: c.h:824
#define Max(x, y)
Definition: c.h:1010
#define INT64_FORMAT
Definition: c.h:570
int64_t int64
Definition: c.h:549
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:486
#define pg_attribute_always_inline
Definition: c.h:285
int32_t int32
Definition: c.h:548
size_t Size
Definition: c.h:624
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
#define palloc0_object(type)
Definition: fe_memutils.h:75
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
Assert(PointerIsAligned(start, uint64))
FILE * output
int y
Definition: isn.c:76
int b
Definition: isn.c:74
int x
Definition: isn.c:75
int a
Definition: isn.c:73
int j
Definition: isn.c:78
int i
Definition: isn.c:77
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1229
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
void pfree(void *pointer)
Definition: mcxt.c:1594
Size GetMemoryChunkSpace(void *pointer)
Definition: mcxt.c:767
void * palloc0(Size size)
Definition: mcxt.c:1395
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
void * repalloc_huge(void *pointer, Size size)
Definition: mcxt.c:1735
void MemoryContextResetOnly(MemoryContext context)
Definition: mcxt.c:419
#define AllocSetContextCreate
Definition: memutils.h:129
#define MaxAllocHugeSize
Definition: memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const void size_t len
const char * pg_rusage_show(const PGRUsage *ru0)
Definition: pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition: pg_rusage.c:27
static char buf[DEFAULT_XLOG_SEG_SIZE]
Definition: pg_test_fsync.c:71
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:393
uint64_t Datum
Definition: postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:56
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static int ApplySignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:266
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:233
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:300
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
SharedFileSet fileset
Definition: tuplesort.c:358
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: tuplesort.c:367
int workersFinished
Definition: tuplesort.c:355
int nTapes
Definition: tuplesort.c:361
slock_t mutex
Definition: tuplesort.c:344
int currentWorker
Definition: tuplesort.c:354
Sharedsort * sharedsort
Definition: tuplesort.h:59
bool ssup_nulls_first
Definition: sortsupport.h:75
void * tuple
Definition: tuplesort.h:150
int srctape
Definition: tuplesort.h:153
Datum datum1
Definition: tuplesort.h:151
int64 firstblocknumber
Definition: logtape.h:54
TuplesortMethod sortMethod
Definition: tuplesort.h:113
TuplesortSpaceType spaceType
Definition: tuplesort.h:114
void * lastReturnedTuple
Definition: tuplesort.c:262
LogicalTapeSet * tapeset
Definition: tuplesort.c:206
bool isMaxSpaceDisk
Definition: tuplesort.c:202
bool growmemtuples
Definition: tuplesort.c:218
SortTuple * memtuples
Definition: tuplesort.c:215
int64 maxSpace
Definition: tuplesort.c:200
LogicalTape ** inputTapes
Definition: tuplesort.c:278
bool slabAllocatorUsed
Definition: tuplesort.c:247
TuplesortPublic base
Definition: tuplesort.c:185
char * slabMemoryEnd
Definition: tuplesort.c:250
int64 tupleMem
Definition: tuplesort.c:191
PGRUsage ru_start
Definition: tuplesort.c:334
char * slabMemoryBegin
Definition: tuplesort.c:249
LogicalTape ** outputTapes
Definition: tuplesort.c:282
bool eof_reached
Definition: tuplesort.c:295
size_t tape_buffer_mem
Definition: tuplesort.c:254
TupSortStatus status
Definition: tuplesort.c:186
int64 availMem
Definition: tuplesort.c:196
LogicalTape * destTape
Definition: tuplesort.c:286
TupSortStatus maxSpaceStatus
Definition: tuplesort.c:205
bool markpos_eof
Definition: tuplesort.c:300
int64 abbrevNext
Definition: tuplesort.c:328
int64 markpos_block
Definition: tuplesort.c:298
Sharedsort * shared
Definition: tuplesort.c:319
LogicalTape * result_tape
Definition: tuplesort.c:293
SlabSlot * slabFreeHead
Definition: tuplesort.c:251
int markpos_offset
Definition: tuplesort.c:299
int64 allowedMem
Definition: tuplesort.c:197
Definition: regguts.h:323
void tuplesort_rescan(Tuplesortstate *state)
Definition: tuplesort.c:2388
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1349
int tuplesort_merge_order(int64 allowedMem)
Definition: tuplesort.c:1764
#define TAPE_BUFFER_OVERHEAD
Definition: tuplesort.c:176
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition: tuplesort.c:2758
#define INITIAL_MEMTUPSIZE
Definition: tuplesort.c:118
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition: tuplesort.c:2840
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition: tuplesort.c:2922
#define COMPARETUP(state, a, b)
Definition: tuplesort.c:394
static void selectnewtape(Tuplesortstate *state)
Definition: tuplesort.c:1934
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1005
#define SERIAL(state)
Definition: tuplesort.c:401
#define FREESTATE(state)
Definition: tuplesort.c:397
static void markrunend(LogicalTape *tape)
Definition: tuplesort.c:2853
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition: tuplesort.c:1696
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition: tuplesort.c:3112
#define REMOVEABBREV(state, stup, count)
Definition: tuplesort.c:393
#define LACKMEM(state)
Definition: tuplesort.c:398
static void reversedirection(Tuplesortstate *state)
Definition: tuplesort.c:2822
#define USEMEM(state, amt)
Definition: tuplesort.c:399
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2723
int ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3134
static bool grow_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:1038
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3123
static void beginmerge(Tuplesortstate *state)
Definition: tuplesort.c:2246
static void make_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2573
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:872
#define WRITETUP(state, tape, stup)
Definition: tuplesort.c:395
static void sort_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2622
TupSortStatus
Definition: tuplesort.c:153
@ TSS_SORTEDONTAPE
Definition: tuplesort.c:158
@ TSS_SORTEDINMEM
Definition: tuplesort.c:157
@ TSS_INITIAL
Definition: tuplesort.c:154
@ TSS_FINALMERGE
Definition: tuplesort.c:159
@ TSS_BUILDRUNS
Definition: tuplesort.c:156
@ TSS_BOUNDED
Definition: tuplesort.c:155
static int worker_get_identifier(Tuplesortstate *state)
Definition: tuplesort.c:2965
static void mergeonerun(Tuplesortstate *state)
Definition: tuplesort.c:2186
#define FREEMEM(state, amt)
Definition: tuplesort.c:400
#define MAXORDER
Definition: tuplesort.c:175
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition: tuplesort.c:1900
#define SLAB_SLOT_SIZE
Definition: tuplesort.c:140
static void leader_takeover_tapes(Tuplesortstate *state)
Definition: tuplesort.c:3053
Size tuplesort_estimate_shared(int nWorkers)
Definition: tuplesort.c:2901
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:2485
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition: tuplesort.c:636
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:2662
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:937
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition: tuplesort.c:1851
void tuplesort_markpos(Tuplesortstate *state)
Definition: tuplesort.c:2421
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition: tuplesort.c:1155
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition: tuplesort.c:2552
#define MERGE_BUFFER_SIZE
Definition: tuplesort.c:177
#define READTUP(state, stup, tape, len)
Definition: tuplesort.c:396
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3148
#define LEADER(state)
Definition: tuplesort.c:403
#define WORKER(state)
Definition: tuplesort.c:402
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition: tuplesort.c:1456
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition: tuplesort.c:1819
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition: tuplesort.c:2274
union SlabSlot SlabSlot
static void tuplesort_updatemax(Tuplesortstate *state)
Definition: tuplesort.c:954
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition: tuplesort.c:2993
bool trace_sort
Definition: tuplesort.c:122
static pg_attribute_always_inline int qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:515
#define RELEASE_SLAB_SLOT(state, tuple)
Definition: tuplesort.c:381
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition: tuplesort.c:2945
static void worker_nomergeruns(Tuplesortstate *state)
Definition: tuplesort.c:3031
const char * tuplesort_method_name(TuplesortMethod m)
Definition: tuplesort.c:2529
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:493
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2782
void tuplesort_restorepos(Tuplesortstate *state)
Definition: tuplesort.c:2452
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:538
static void mergeruns(Tuplesortstate *state)
Definition: tuplesort.c:2003
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition: tuplesort.c:2867
#define MINORDER
Definition: tuplesort.c:174
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition: tuplesort.c:742
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:824
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition: tuplesort.c:1967
static bool consider_abort_common(Tuplesortstate *state)
Definition: tuplesort.c:1305
static void tuplesort_free(Tuplesortstate *state)
Definition: tuplesort.c:883
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition: tuplesort.c:2293
#define TupleSortUseBumpTupleCxt(opt)
Definition: tuplesort.h:109
#define TUPLESORT_RANDOMACCESS
Definition: tuplesort.h:97
#define TUPLESORT_ALLOWBOUNDED
Definition: tuplesort.h:100
TuplesortSpaceType
Definition: tuplesort.h:88
@ SORT_SPACE_TYPE_DISK
Definition: tuplesort.h:89
@ SORT_SPACE_TYPE_MEMORY
Definition: tuplesort.h:90
TuplesortMethod
Definition: tuplesort.h:77
@ SORT_TYPE_EXTERNAL_SORT
Definition: tuplesort.h:81
@ SORT_TYPE_TOP_N_HEAPSORT
Definition: tuplesort.h:79
@ SORT_TYPE_QUICKSORT
Definition: tuplesort.h:80
@ SORT_TYPE_STILL_IN_PROGRESS
Definition: tuplesort.h:78
@ SORT_TYPE_EXTERNAL_MERGE
Definition: tuplesort.h:82
char buffer[SLAB_SLOT_SIZE]
Definition: tuplesort.c:145
union SlabSlot * nextfree
Definition: tuplesort.c:144