PostgreSQL Source Code  git master
tuplesort.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  * Generalized tuple sorting routines.
5  *
6  * This module provides a generalized facility for tuple sorting, which can be
7  * applied to different kinds of sortable objects. Implementation of
8  * the particular sorting variants is given in tuplesortvariants.c.
9  * This module works efficiently for both small and large amounts
10  * of data. Small amounts are sorted in-memory using qsort(). Large
11  * amounts are sorted using temporary files and a standard external sort
12  * algorithm.
13  *
14  * See Knuth, volume 3, for more than you want to know about external
15  * sorting algorithms. The algorithm we use is a balanced k-way merge.
16  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18  * merge is better. Knuth is assuming that tape drives are expensive
19  * beasts, and in particular that there will always be many more runs than
20  * tape drives. The polyphase merge algorithm was good at keeping all the
21  * tape drives busy, but in our implementation a "tape drive" doesn't cost
22  * much more than a few Kb of memory buffers, so we can afford to have
23  * lots of them. In particular, if we can have as many tape drives as
24  * sorted runs, we can eliminate any repeated I/O at all.
25  *
26  * Historically, we divided the input into sorted runs using replacement
27  * selection, in the form of a priority tree implemented as a heap
28  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29  * for run generation.
30  *
31  * The approximate amount of memory allowed for any one sort operation
32  * is specified in kilobytes by the caller (most pass work_mem). Initially,
33  * we absorb tuples and simply store them in an unsorted array as long as
34  * we haven't exceeded workMem. If we reach the end of the input without
35  * exceeding workMem, we sort the array using qsort() and subsequently return
36  * tuples just by scanning the tuple array sequentially. If we do exceed
37  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38  * When tuples are dumped in batch after quicksorting, we begin a new run
39  * with a new output tape. If we reach the max number of tapes, we write
40  * subsequent runs on the existing tapes in a round-robin fashion. We will
41  * need multiple merge passes to finish the merge in that case. After the
42  * end of the input is reached, we dump out remaining tuples in memory into
43  * a final run, then merge the runs.
44  *
45  * When merging runs, we use a heap containing just the frontmost tuple from
46  * each source run; we repeatedly output the smallest tuple and replace it
47  * with the next tuple from its source tape (if any). When the heap empties,
48  * the merge is complete. The basic merge algorithm thus needs very little
49  * memory --- only M tuples for an M-way merge, and M is constrained to a
50  * small number. However, we can still make good use of our full workMem
51  * allocation by pre-reading additional blocks from each source tape. Without
52  * prereading, our access pattern to the temporary file would be very erratic;
53  * on average we'd read one block from each of M source tapes during the same
54  * time that we're writing M blocks to the output tape, so there is no
55  * sequentiality of access at all, defeating the read-ahead methods used by
56  * most Unix kernels. Worse, the output tape gets written into a very random
57  * sequence of blocks of the temp file, ensuring that things will be even
58  * worse when it comes time to read that tape. A straightforward merge pass
59  * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60  * by prereading from each source tape sequentially, loading about workMem/M
61  * bytes from each tape in turn, and making the sequential blocks immediately
62  * available for reuse. This approach helps to localize both read and write
63  * accesses. The pre-reading is handled by logtape.c, we just tell it how
64  * much memory to use for the buffers.
65  *
66  * In the current code we determine the number of input tapes M on the basis
67  * of workMem: we want workMem/M to be large enough that we read a fair
68  * amount of data each time we read from a tape, so as to maintain the
69  * locality of access described above. Nonetheless, with large workMem we
70  * can have many tapes. The logical "tapes" are implemented by logtape.c,
71  * which avoids space wastage by recycling disk space as soon as each block
72  * is read from its "tape".
73  *
74  * When the caller requests random access to the sort result, we form
75  * the final sorted run on a logical tape which is then "frozen", so
76  * that we can access it randomly. When the caller does not need random
77  * access, we return from tuplesort_performsort() as soon as we are down
78  * to one run per logical tape. The final merge is then performed
79  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80  * saves one cycle of writing all the data out to disk and reading it in.
81  *
82  * This module supports parallel sorting. Parallel sorts involve coordination
83  * among one or more worker processes, and a leader process, each with its own
84  * tuplesort state. The leader process (or, more accurately, the
85  * Tuplesortstate associated with a leader process) creates a full tapeset
86  * consisting of worker tapes with one run to merge; a run for every
87  * worker process. This is then merged. Worker processes are guaranteed to
88  * produce exactly one output run from their partial input.
89  *
90  *
91  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
92  * Portions Copyright (c) 1994, Regents of the University of California
93  *
94  * IDENTIFICATION
95  * src/backend/utils/sort/tuplesort.c
96  *
97  *-------------------------------------------------------------------------
98  */
99 
100 #include "postgres.h"
101 
102 #include <limits.h>
103 
104 #include "catalog/pg_am.h"
105 #include "commands/tablespace.h"
106 #include "executor/executor.h"
107 #include "miscadmin.h"
108 #include "pg_trace.h"
109 #include "storage/shmem.h"
110 #include "utils/memutils.h"
111 #include "utils/pg_rusage.h"
112 #include "utils/rel.h"
113 #include "utils/tuplesort.h"
114 
115 /*
116  * Initial size of memtuples array. We're trying to select this size so that
117  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
118  * allocation might possibly be lowered. However, we don't consider array sizes
119  * less than 1024.
120  *
121  */
122 #define INITIAL_MEMTUPSIZE Max(1024, \
123  ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
124 
125 /* GUC variables */
126 #ifdef TRACE_SORT
127 bool trace_sort = false;
128 #endif
129 
130 #ifdef DEBUG_BOUNDED_SORT
131 bool optimize_bounded_sort = true;
132 #endif
133 
134 
135 /*
136  * During merge, we use a pre-allocated set of fixed-size slots to hold
137  * tuples. To avoid palloc/pfree overhead.
138  *
139  * Merge doesn't require a lot of memory, so we can afford to waste some,
140  * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
141  * palloc() overhead is not significant anymore.
142  *
143  * 'nextfree' is valid when this chunk is in the free list. When in use, the
144  * slot holds a tuple.
145  */
146 #define SLAB_SLOT_SIZE 1024
147 
148 typedef union SlabSlot
149 {
153 
154 /*
155  * Possible states of a Tuplesort object. These denote the states that
156  * persist between calls of Tuplesort routines.
157  */
158 typedef enum
159 {
160  TSS_INITIAL, /* Loading tuples; still within memory limit */
161  TSS_BOUNDED, /* Loading tuples into bounded-size heap */
162  TSS_BUILDRUNS, /* Loading tuples; writing to tape */
163  TSS_SORTEDINMEM, /* Sort completed entirely in memory */
164  TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
165  TSS_FINALMERGE, /* Performing final merge on-the-fly */
166 } TupSortStatus;
167 
168 /*
169  * Parameters for calculation of number of tapes to use --- see inittapes()
170  * and tuplesort_merge_order().
171  *
172  * In this calculation we assume that each tape will cost us about 1 blocks
173  * worth of buffer space. This ignores the overhead of all the other data
174  * structures needed for each tape, but it's probably close enough.
175  *
176  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
177  * input tape, for pre-reading (see discussion at top of file). This is *in
178  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
179  */
180 #define MINORDER 6 /* minimum merge order */
181 #define MAXORDER 500 /* maximum merge order */
182 #define TAPE_BUFFER_OVERHEAD BLCKSZ
183 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
184 
185 
186 /*
187  * Private state of a Tuplesort operation.
188  */
190 {
192  TupSortStatus status; /* enumerated value as shown above */
193  bool bounded; /* did caller specify a maximum number of
194  * tuples to return? */
195  bool boundUsed; /* true if we made use of a bounded heap */
196  int bound; /* if bounded, the maximum number of tuples */
197  int64 availMem; /* remaining memory available, in bytes */
198  int64 allowedMem; /* total memory allowed, in bytes */
199  int maxTapes; /* max number of input tapes to merge in each
200  * pass */
201  int64 maxSpace; /* maximum amount of space occupied among sort
202  * of groups, either in-memory or on-disk */
203  bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
204  * space, false when it's value for in-memory
205  * space */
206  TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
207  LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
208 
209  /*
210  * This array holds the tuples now in sort memory. If we are in state
211  * INITIAL, the tuples are in no particular order; if we are in state
212  * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
213  * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
214  * H. In state SORTEDONTAPE, the array is not used.
215  */
216  SortTuple *memtuples; /* array of SortTuple structs */
217  int memtupcount; /* number of tuples currently present */
218  int memtupsize; /* allocated length of memtuples array */
219  bool growmemtuples; /* memtuples' growth still underway? */
220 
221  /*
222  * Memory for tuples is sometimes allocated using a simple slab allocator,
223  * rather than with palloc(). Currently, we switch to slab allocation
224  * when we start merging. Merging only needs to keep a small, fixed
225  * number of tuples in memory at any time, so we can avoid the
226  * palloc/pfree overhead by recycling a fixed number of fixed-size slots
227  * to hold the tuples.
228  *
229  * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
230  * slots. The allocation is sized to have one slot per tape, plus one
231  * additional slot. We need that many slots to hold all the tuples kept
232  * in the heap during merge, plus the one we have last returned from the
233  * sort, with tuplesort_gettuple.
234  *
235  * Initially, all the slots are kept in a linked list of free slots. When
236  * a tuple is read from a tape, it is put to the next available slot, if
237  * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
238  * instead.
239  *
240  * When we're done processing a tuple, we return the slot back to the free
241  * list, or pfree() if it was palloc'd. We know that a tuple was
242  * allocated from the slab, if its pointer value is between
243  * slabMemoryBegin and -End.
244  *
245  * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
246  * tracking memory usage is not used.
247  */
249 
250  char *slabMemoryBegin; /* beginning of slab memory arena */
251  char *slabMemoryEnd; /* end of slab memory arena */
252  SlabSlot *slabFreeHead; /* head of free list */
253 
254  /* Memory used for input and output tape buffers. */
256 
257  /*
258  * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
259  * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
260  * modes), we remember the tuple in 'lastReturnedTuple', so that we can
261  * recycle the memory on next gettuple call.
262  */
264 
265  /*
266  * While building initial runs, this is the current output run number.
267  * Afterwards, it is the number of initial runs we made.
268  */
270 
271  /*
272  * Logical tapes, for merging.
273  *
274  * The initial runs are written in the output tapes. In each merge pass,
275  * the output tapes of the previous pass become the input tapes, and new
276  * output tapes are created as needed. When nInputTapes equals
277  * nInputRuns, there is only one merge pass left.
278  */
282 
286 
287  LogicalTape *destTape; /* current output tape */
288 
289  /*
290  * These variables are used after completion of sorting to keep track of
291  * the next tuple to return. (In the tape case, the tape's current read
292  * position is also critical state.)
293  */
294  LogicalTape *result_tape; /* actual tape of finished output */
295  int current; /* array index (only used if SORTEDINMEM) */
296  bool eof_reached; /* reached EOF (needed for cursors) */
297 
298  /* markpos_xxx holds marked position for mark and restore */
299  int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
300  int markpos_offset; /* saved "current", or offset in tape block */
301  bool markpos_eof; /* saved "eof_reached" */
302 
303  /*
304  * These variables are used during parallel sorting.
305  *
306  * worker is our worker identifier. Follows the general convention that
307  * -1 value relates to a leader tuplesort, and values >= 0 worker
308  * tuplesorts. (-1 can also be a serial tuplesort.)
309  *
310  * shared is mutable shared memory state, which is used to coordinate
311  * parallel sorts.
312  *
313  * nParticipants is the number of worker Tuplesortstates known by the
314  * leader to have actually been launched, which implies that they must
315  * finish a run that the leader needs to merge. Typically includes a
316  * worker state held by the leader process itself. Set in the leader
317  * Tuplesortstate only.
318  */
319  int worker;
322 
323  /*
324  * Additional state for managing "abbreviated key" sortsupport routines
325  * (which currently may be used by all cases except the hash index case).
326  * Tracks the intervals at which the optimization's effectiveness is
327  * tested.
328  */
329  int64 abbrevNext; /* Tuple # at which to next check
330  * applicability */
331 
332  /*
333  * Resource snapshot for time of sort start.
334  */
335 #ifdef TRACE_SORT
337 #endif
338 };
339 
340 /*
341  * Private mutable state of tuplesort-parallel-operation. This is allocated
342  * in shared memory.
343  */
345 {
346  /* mutex protects all fields prior to tapes */
348 
349  /*
350  * currentWorker generates ordinal identifier numbers for parallel sort
351  * workers. These start from 0, and are always gapless.
352  *
353  * Workers increment workersFinished to indicate having finished. If this
354  * is equal to state.nParticipants within the leader, leader is ready to
355  * merge worker runs.
356  */
359 
360  /* Temporary file space */
362 
363  /* Size of tapes flexible array */
364  int nTapes;
365 
366  /*
367  * Tapes array used by workers to report back information needed by the
368  * leader to concatenate all worker tapes into one for merging
369  */
371 };
372 
373 /*
374  * Is the given tuple allocated from the slab memory arena?
375  */
376 #define IS_SLAB_SLOT(state, tuple) \
377  ((char *) (tuple) >= (state)->slabMemoryBegin && \
378  (char *) (tuple) < (state)->slabMemoryEnd)
379 
380 /*
381  * Return the given tuple to the slab memory free list, or free it
382  * if it was palloc'd.
383  */
384 #define RELEASE_SLAB_SLOT(state, tuple) \
385  do { \
386  SlabSlot *buf = (SlabSlot *) tuple; \
387  \
388  if (IS_SLAB_SLOT((state), buf)) \
389  { \
390  buf->nextfree = (state)->slabFreeHead; \
391  (state)->slabFreeHead = buf; \
392  } else \
393  pfree(buf); \
394  } while(0)
395 
396 #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
397 #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
398 #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
399 #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
400 #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
401 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
402 #define USEMEM(state,amt) ((state)->availMem -= (amt))
403 #define FREEMEM(state,amt) ((state)->availMem += (amt))
404 #define SERIAL(state) ((state)->shared == NULL)
405 #define WORKER(state) ((state)->shared && (state)->worker != -1)
406 #define LEADER(state) ((state)->shared && (state)->worker == -1)
407 
408 /*
409  * NOTES about on-tape representation of tuples:
410  *
411  * We require the first "unsigned int" of a stored tuple to be the total size
412  * on-tape of the tuple, including itself (so it is never zero; an all-zero
413  * unsigned int is used to delimit runs). The remainder of the stored tuple
414  * may or may not match the in-memory representation of the tuple ---
415  * any conversion needed is the job of the writetup and readtup routines.
416  *
417  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
418  * representation of the tuple must be followed by another "unsigned int" that
419  * is a copy of the length --- so the total tape space used is actually
420  * sizeof(unsigned int) more than the stored length value. This allows
421  * read-backwards. When the random access flag was not specified, the
422  * write/read routines may omit the extra length word.
423  *
424  * writetup is expected to write both length words as well as the tuple
425  * data. When readtup is called, the tape is positioned just after the
426  * front length word; readtup must read the tuple data and advance past
427  * the back length word (if present).
428  *
429  * The write/read routines can make use of the tuple description data
430  * stored in the Tuplesortstate record, if needed. They are also expected
431  * to adjust state->availMem by the amount of memory space (not tape space!)
432  * released or consumed. There is no error return from either writetup
433  * or readtup; they should ereport() on failure.
434  *
435  *
436  * NOTES about memory consumption calculations:
437  *
438  * We count space allocated for tuples against the workMem limit, plus
439  * the space used by the variable-size memtuples array. Fixed-size space
440  * is not counted; it's small enough to not be interesting.
441  *
442  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
443  * rather than the originally-requested size. This is important since
444  * palloc can add substantial overhead. It's not a complete answer since
445  * we won't count any wasted space in palloc allocation blocks, but it's
446  * a lot better than what we were doing before 7.3. As of 9.6, a
447  * separate memory context is used for caller passed tuples. Resetting
448  * it at certain key increments significantly ameliorates fragmentation.
449  * readtup routines use the slab allocator (they cannot use
450  * the reset context because it gets deleted at the point that merging
451  * begins).
452  */
453 
454 
457 static void inittapes(Tuplesortstate *state, bool mergeruns);
458 static void inittapestate(Tuplesortstate *state, int maxTapes);
459 static void selectnewtape(Tuplesortstate *state);
460 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
461 static void mergeruns(Tuplesortstate *state);
462 static void mergeonerun(Tuplesortstate *state);
463 static void beginmerge(Tuplesortstate *state);
464 static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
465 static void dumptuples(Tuplesortstate *state, bool alltuples);
473 static unsigned int getlen(LogicalTape *tape, bool eofOK);
474 static void markrunend(LogicalTape *tape);
479 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
480 static void tuplesort_free(Tuplesortstate *state);
482 
483 /*
484  * Specialized comparators that we can inline into specialized sorts. The goal
485  * is to try to sort two tuples without having to follow the pointers to the
486  * comparator or the tuple.
487  *
488  * XXX: For now, there is no specialization for cases where datum1 is
489  * authoritative and we don't even need to fall back to a callback at all (that
490  * would be true for types like int4/int8/timestamp/date, but not true for
491  * abbreviations of text or multi-key sorts. There could be! Is it worth it?
492  */
493 
494 /* Used if first key's comparator is ssup_datum_unsigned_cmp */
497 {
498  int compare;
499 
500  compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
501  b->datum1, b->isnull1,
502  &state->base.sortKeys[0]);
503  if (compare != 0)
504  return compare;
505 
506  /*
507  * No need to waste effort calling the tiebreak function when there are no
508  * other keys to sort on.
509  */
510  if (state->base.onlyKey != NULL)
511  return 0;
512 
513  return state->base.comparetup_tiebreak(a, b, state);
514 }
515 
516 #if SIZEOF_DATUM >= 8
517 /* Used if first key's comparator is ssup_datum_signed_cmp */
519 qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
520 {
521  int compare;
522 
523  compare = ApplySignedSortComparator(a->datum1, a->isnull1,
524  b->datum1, b->isnull1,
525  &state->base.sortKeys[0]);
526 
527  if (compare != 0)
528  return compare;
529 
530  /*
531  * No need to waste effort calling the tiebreak function when there are no
532  * other keys to sort on.
533  */
534  if (state->base.onlyKey != NULL)
535  return 0;
536 
537  return state->base.comparetup_tiebreak(a, b, state);
538 }
539 #endif
540 
541 /* Used if first key's comparator is ssup_datum_int32_cmp */
544 {
545  int compare;
546 
547  compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
548  b->datum1, b->isnull1,
549  &state->base.sortKeys[0]);
550 
551  if (compare != 0)
552  return compare;
553 
554  /*
555  * No need to waste effort calling the tiebreak function when there are no
556  * other keys to sort on.
557  */
558  if (state->base.onlyKey != NULL)
559  return 0;
560 
561  return state->base.comparetup_tiebreak(a, b, state);
562 }
563 
564 /*
565  * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
566  * any variant of SortTuples, using the appropriate comparetup function.
567  * qsort_ssup() is specialized for the case where the comparetup function
568  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
569  * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
570  * common comparison functions on pass-by-value leading datums.
571  */
572 
573 #define ST_SORT qsort_tuple_unsigned
574 #define ST_ELEMENT_TYPE SortTuple
575 #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
576 #define ST_COMPARE_ARG_TYPE Tuplesortstate
577 #define ST_CHECK_FOR_INTERRUPTS
578 #define ST_SCOPE static
579 #define ST_DEFINE
580 #include "lib/sort_template.h"
581 
582 #if SIZEOF_DATUM >= 8
583 #define ST_SORT qsort_tuple_signed
584 #define ST_ELEMENT_TYPE SortTuple
585 #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
586 #define ST_COMPARE_ARG_TYPE Tuplesortstate
587 #define ST_CHECK_FOR_INTERRUPTS
588 #define ST_SCOPE static
589 #define ST_DEFINE
590 #include "lib/sort_template.h"
591 #endif
592 
593 #define ST_SORT qsort_tuple_int32
594 #define ST_ELEMENT_TYPE SortTuple
595 #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
596 #define ST_COMPARE_ARG_TYPE Tuplesortstate
597 #define ST_CHECK_FOR_INTERRUPTS
598 #define ST_SCOPE static
599 #define ST_DEFINE
600 #include "lib/sort_template.h"
601 
602 #define ST_SORT qsort_tuple
603 #define ST_ELEMENT_TYPE SortTuple
604 #define ST_COMPARE_RUNTIME_POINTER
605 #define ST_COMPARE_ARG_TYPE Tuplesortstate
606 #define ST_CHECK_FOR_INTERRUPTS
607 #define ST_SCOPE static
608 #define ST_DECLARE
609 #define ST_DEFINE
610 #include "lib/sort_template.h"
611 
612 #define ST_SORT qsort_ssup
613 #define ST_ELEMENT_TYPE SortTuple
614 #define ST_COMPARE(a, b, ssup) \
615  ApplySortComparator((a)->datum1, (a)->isnull1, \
616  (b)->datum1, (b)->isnull1, (ssup))
617 #define ST_COMPARE_ARG_TYPE SortSupportData
618 #define ST_CHECK_FOR_INTERRUPTS
619 #define ST_SCOPE static
620 #define ST_DEFINE
621 #include "lib/sort_template.h"
622 
623 /*
624  * tuplesort_begin_xxx
625  *
626  * Initialize for a tuple sort operation.
627  *
628  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
629  * zero or more times, then call tuplesort_performsort when all the tuples
630  * have been supplied. After performsort, retrieve the tuples in sorted
631  * order by calling tuplesort_getXXX until it returns false/NULL. (If random
632  * access was requested, rescan, markpos, and restorepos can also be called.)
633  * Call tuplesort_end to terminate the operation and release memory/disk space.
634  *
635  * Each variant of tuplesort_begin has a workMem parameter specifying the
636  * maximum number of kilobytes of RAM to use before spilling data to disk.
637  * (The normal value of this parameter is work_mem, but some callers use
638  * other values.) Each variant also has a sortopt which is a bitmask of
639  * sort options. See TUPLESORT_* definitions in tuplesort.h
640  */
641 
643 tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
644 {
646  MemoryContext maincontext;
647  MemoryContext sortcontext;
648  MemoryContext oldcontext;
649 
650  /* See leader_takeover_tapes() remarks on random access support */
651  if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
652  elog(ERROR, "random access disallowed under parallel sort");
653 
654  /*
655  * Memory context surviving tuplesort_reset. This memory context holds
656  * data which is useful to keep while sorting multiple similar batches.
657  */
659  "TupleSort main",
661 
662  /*
663  * Create a working memory context for one sort operation. The content of
664  * this context is deleted by tuplesort_reset.
665  */
666  sortcontext = AllocSetContextCreate(maincontext,
667  "TupleSort sort",
669 
670  /*
671  * Additionally a working memory context for tuples is setup in
672  * tuplesort_begin_batch.
673  */
674 
675  /*
676  * Make the Tuplesortstate within the per-sortstate context. This way, we
677  * don't need a separate pfree() operation for it at shutdown.
678  */
679  oldcontext = MemoryContextSwitchTo(maincontext);
680 
682 
683 #ifdef TRACE_SORT
684  if (trace_sort)
685  pg_rusage_init(&state->ru_start);
686 #endif
687 
688  state->base.sortopt = sortopt;
689  state->base.tuples = true;
690  state->abbrevNext = 10;
691 
692  /*
693  * workMem is forced to be at least 64KB, the current minimum valid value
694  * for the work_mem GUC. This is a defense against parallel sort callers
695  * that divide out memory among many workers in a way that leaves each
696  * with very little memory.
697  */
698  state->allowedMem = Max(workMem, 64) * (int64) 1024;
699  state->base.sortcontext = sortcontext;
700  state->base.maincontext = maincontext;
701 
702  /*
703  * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
704  * see comments in grow_memtuples().
705  */
706  state->memtupsize = INITIAL_MEMTUPSIZE;
707  state->memtuples = NULL;
708 
709  /*
710  * After all of the other non-parallel-related state, we setup all of the
711  * state needed for each batch.
712  */
714 
715  /*
716  * Initialize parallel-related state based on coordination information
717  * from caller
718  */
719  if (!coordinate)
720  {
721  /* Serial sort */
722  state->shared = NULL;
723  state->worker = -1;
724  state->nParticipants = -1;
725  }
726  else if (coordinate->isWorker)
727  {
728  /* Parallel worker produces exactly one final run from all input */
729  state->shared = coordinate->sharedsort;
730  state->worker = worker_get_identifier(state);
731  state->nParticipants = -1;
732  }
733  else
734  {
735  /* Parallel leader state only used for final merge */
736  state->shared = coordinate->sharedsort;
737  state->worker = -1;
738  state->nParticipants = coordinate->nParticipants;
739  Assert(state->nParticipants >= 1);
740  }
741 
742  MemoryContextSwitchTo(oldcontext);
743 
744  return state;
745 }
746 
747 /*
748  * tuplesort_begin_batch
749  *
750  * Setup, or reset, all state need for processing a new set of tuples with this
751  * sort state. Called both from tuplesort_begin_common (the first time sorting
752  * with this sort state) and tuplesort_reset (for subsequent usages).
753  */
754 static void
756 {
757  MemoryContext oldcontext;
758 
759  oldcontext = MemoryContextSwitchTo(state->base.maincontext);
760 
761  /*
762  * Caller tuple (e.g. IndexTuple) memory context.
763  *
764  * A dedicated child context used exclusively for caller passed tuples
765  * eases memory management. Resetting at key points reduces
766  * fragmentation. Note that the memtuples array of SortTuples is allocated
767  * in the parent context, not this context, because there is no need to
768  * free memtuples early. For bounded sorts, tuples may be pfreed in any
769  * order, so we use a regular aset.c context so that it can make use of
770  * free'd memory. When the sort is not bounded, we make use of a
771  * generation.c context as this keeps allocations more compact with less
772  * wastage. Allocations are also slightly more CPU efficient.
773  */
774  if (state->base.sortopt & TUPLESORT_ALLOWBOUNDED)
775  state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
776  "Caller tuples",
778  else
779  state->base.tuplecontext = GenerationContextCreate(state->base.sortcontext,
780  "Caller tuples",
782 
783 
784  state->status = TSS_INITIAL;
785  state->bounded = false;
786  state->boundUsed = false;
787 
788  state->availMem = state->allowedMem;
789 
790  state->tapeset = NULL;
791 
792  state->memtupcount = 0;
793 
794  /*
795  * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
796  * see comments in grow_memtuples().
797  */
798  state->growmemtuples = true;
799  state->slabAllocatorUsed = false;
800  if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
801  {
802  pfree(state->memtuples);
803  state->memtuples = NULL;
804  state->memtupsize = INITIAL_MEMTUPSIZE;
805  }
806  if (state->memtuples == NULL)
807  {
808  state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
809  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
810  }
811 
812  /* workMem must be large enough for the minimal memtuples array */
813  if (LACKMEM(state))
814  elog(ERROR, "insufficient memory allowed for sort");
815 
816  state->currentRun = 0;
817 
818  /*
819  * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
820  * inittapes(), if needed.
821  */
822 
823  state->result_tape = NULL; /* flag that result tape has not been formed */
824 
825  MemoryContextSwitchTo(oldcontext);
826 }
827 
828 /*
829  * tuplesort_set_bound
830  *
831  * Advise tuplesort that at most the first N result tuples are required.
832  *
833  * Must be called before inserting any tuples. (Actually, we could allow it
834  * as long as the sort hasn't spilled to disk, but there seems no need for
835  * delayed calls at the moment.)
836  *
837  * This is a hint only. The tuplesort may still return more tuples than
838  * requested. Parallel leader tuplesorts will always ignore the hint.
839  */
840 void
842 {
843  /* Assert we're called before loading any tuples */
844  Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
845  /* Assert we allow bounded sorts */
846  Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
847  /* Can't set the bound twice, either */
848  Assert(!state->bounded);
849  /* Also, this shouldn't be called in a parallel worker */
850  Assert(!WORKER(state));
851 
852  /* Parallel leader allows but ignores hint */
853  if (LEADER(state))
854  return;
855 
856 #ifdef DEBUG_BOUNDED_SORT
857  /* Honor GUC setting that disables the feature (for easy testing) */
858  if (!optimize_bounded_sort)
859  return;
860 #endif
861 
862  /* We want to be able to compute bound * 2, so limit the setting */
863  if (bound > (int64) (INT_MAX / 2))
864  return;
865 
866  state->bounded = true;
867  state->bound = (int) bound;
868 
869  /*
870  * Bounded sorts are not an effective target for abbreviated key
871  * optimization. Disable by setting state to be consistent with no
872  * abbreviation support.
873  */
874  state->base.sortKeys->abbrev_converter = NULL;
875  if (state->base.sortKeys->abbrev_full_comparator)
876  state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
877 
878  /* Not strictly necessary, but be tidy */
879  state->base.sortKeys->abbrev_abort = NULL;
880  state->base.sortKeys->abbrev_full_comparator = NULL;
881 }
882 
883 /*
884  * tuplesort_used_bound
885  *
886  * Allow callers to find out if the sort state was able to use a bound.
887  */
888 bool
890 {
891  return state->boundUsed;
892 }
893 
894 /*
895  * tuplesort_free
896  *
897  * Internal routine for freeing resources of tuplesort.
898  */
899 static void
901 {
902  /* context swap probably not needed, but let's be safe */
903  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
904 
905 #ifdef TRACE_SORT
906  int64 spaceUsed;
907 
908  if (state->tapeset)
909  spaceUsed = LogicalTapeSetBlocks(state->tapeset);
910  else
911  spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
912 #endif
913 
914  /*
915  * Delete temporary "tape" files, if any.
916  *
917  * Note: want to include this in reported total cost of sort, hence need
918  * for two #ifdef TRACE_SORT sections.
919  *
920  * We don't bother to destroy the individual tapes here. They will go away
921  * with the sortcontext. (In TSS_FINALMERGE state, we have closed
922  * finished tapes already.)
923  */
924  if (state->tapeset)
925  LogicalTapeSetClose(state->tapeset);
926 
927 #ifdef TRACE_SORT
928  if (trace_sort)
929  {
930  if (state->tapeset)
931  elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
932  SERIAL(state) ? "external sort" : "parallel external sort",
933  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
934  else
935  elog(LOG, "%s of worker %d ended, %lld KB used: %s",
936  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
937  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
938  }
939 
940  TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
941 #else
942 
943  /*
944  * If you disabled TRACE_SORT, you can still probe sort__done, but you
945  * ain't getting space-used stats.
946  */
947  TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
948 #endif
949 
950  FREESTATE(state);
951  MemoryContextSwitchTo(oldcontext);
952 
953  /*
954  * Free the per-sort memory context, thereby releasing all working memory.
955  */
956  MemoryContextReset(state->base.sortcontext);
957 }
958 
959 /*
960  * tuplesort_end
961  *
962  * Release resources and clean up.
963  *
964  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
965  * pointing to garbage. Be careful not to attempt to use or free such
966  * pointers afterwards!
967  */
968 void
970 {
972 
973  /*
974  * Free the main memory context, including the Tuplesortstate struct
975  * itself.
976  */
977  MemoryContextDelete(state->base.maincontext);
978 }
979 
980 /*
981  * tuplesort_updatemax
982  *
983  * Update maximum resource usage statistics.
984  */
985 static void
987 {
988  int64 spaceUsed;
989  bool isSpaceDisk;
990 
991  /*
992  * Note: it might seem we should provide both memory and disk usage for a
993  * disk-based sort. However, the current code doesn't track memory space
994  * accurately once we have begun to return tuples to the caller (since we
995  * don't account for pfree's the caller is expected to do), so we cannot
996  * rely on availMem in a disk sort. This does not seem worth the overhead
997  * to fix. Is it worth creating an API for the memory context code to
998  * tell us how much is actually used in sortcontext?
999  */
1000  if (state->tapeset)
1001  {
1002  isSpaceDisk = true;
1003  spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1004  }
1005  else
1006  {
1007  isSpaceDisk = false;
1008  spaceUsed = state->allowedMem - state->availMem;
1009  }
1010 
1011  /*
1012  * Sort evicts data to the disk when it wasn't able to fit that data into
1013  * main memory. This is why we assume space used on the disk to be more
1014  * important for tracking resource usage than space used in memory. Note
1015  * that the amount of space occupied by some tupleset on the disk might be
1016  * less than amount of space occupied by the same tupleset in memory due
1017  * to more compact representation.
1018  */
1019  if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1020  (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1021  {
1022  state->maxSpace = spaceUsed;
1023  state->isMaxSpaceDisk = isSpaceDisk;
1024  state->maxSpaceStatus = state->status;
1025  }
1026 }
1027 
1028 /*
1029  * tuplesort_reset
1030  *
1031  * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1032  * meta-information in. After tuplesort_reset, tuplesort is ready to start
1033  * a new sort. This allows avoiding recreation of tuple sort states (and
1034  * save resources) when sorting multiple small batches.
1035  */
1036 void
1038 {
1041 
1042  /*
1043  * After we've freed up per-batch memory, re-setup all of the state common
1044  * to both the first batch and any subsequent batch.
1045  */
1047 
1048  state->lastReturnedTuple = NULL;
1049  state->slabMemoryBegin = NULL;
1050  state->slabMemoryEnd = NULL;
1051  state->slabFreeHead = NULL;
1052 }
1053 
1054 /*
1055  * Grow the memtuples[] array, if possible within our memory constraint. We
1056  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1057  * limit. Return true if we were able to enlarge the array, false if not.
1058  *
1059  * Normally, at each increment we double the size of the array. When doing
1060  * that would exceed a limit, we attempt one last, smaller increase (and then
1061  * clear the growmemtuples flag so we don't try any more). That allows us to
1062  * use memory as fully as permitted; sticking to the pure doubling rule could
1063  * result in almost half going unused. Because availMem moves around with
1064  * tuple addition/removal, we need some rule to prevent making repeated small
1065  * increases in memtupsize, which would just be useless thrashing. The
1066  * growmemtuples flag accomplishes that and also prevents useless
1067  * recalculations in this function.
1068  */
1069 static bool
1071 {
1072  int newmemtupsize;
1073  int memtupsize = state->memtupsize;
1074  int64 memNowUsed = state->allowedMem - state->availMem;
1075 
1076  /* Forget it if we've already maxed out memtuples, per comment above */
1077  if (!state->growmemtuples)
1078  return false;
1079 
1080  /* Select new value of memtupsize */
1081  if (memNowUsed <= state->availMem)
1082  {
1083  /*
1084  * We've used no more than half of allowedMem; double our usage,
1085  * clamping at INT_MAX tuples.
1086  */
1087  if (memtupsize < INT_MAX / 2)
1088  newmemtupsize = memtupsize * 2;
1089  else
1090  {
1091  newmemtupsize = INT_MAX;
1092  state->growmemtuples = false;
1093  }
1094  }
1095  else
1096  {
1097  /*
1098  * This will be the last increment of memtupsize. Abandon doubling
1099  * strategy and instead increase as much as we safely can.
1100  *
1101  * To stay within allowedMem, we can't increase memtupsize by more
1102  * than availMem / sizeof(SortTuple) elements. In practice, we want
1103  * to increase it by considerably less, because we need to leave some
1104  * space for the tuples to which the new array slots will refer. We
1105  * assume the new tuples will be about the same size as the tuples
1106  * we've already seen, and thus we can extrapolate from the space
1107  * consumption so far to estimate an appropriate new size for the
1108  * memtuples array. The optimal value might be higher or lower than
1109  * this estimate, but it's hard to know that in advance. We again
1110  * clamp at INT_MAX tuples.
1111  *
1112  * This calculation is safe against enlarging the array so much that
1113  * LACKMEM becomes true, because the memory currently used includes
1114  * the present array; thus, there would be enough allowedMem for the
1115  * new array elements even if no other memory were currently used.
1116  *
1117  * We do the arithmetic in float8, because otherwise the product of
1118  * memtupsize and allowedMem could overflow. Any inaccuracy in the
1119  * result should be insignificant; but even if we computed a
1120  * completely insane result, the checks below will prevent anything
1121  * really bad from happening.
1122  */
1123  double grow_ratio;
1124 
1125  grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1126  if (memtupsize * grow_ratio < INT_MAX)
1127  newmemtupsize = (int) (memtupsize * grow_ratio);
1128  else
1129  newmemtupsize = INT_MAX;
1130 
1131  /* We won't make any further enlargement attempts */
1132  state->growmemtuples = false;
1133  }
1134 
1135  /* Must enlarge array by at least one element, else report failure */
1136  if (newmemtupsize <= memtupsize)
1137  goto noalloc;
1138 
1139  /*
1140  * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1141  * to ensure our request won't be rejected. Note that we can easily
1142  * exhaust address space before facing this outcome. (This is presently
1143  * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1144  * don't rely on that at this distance.)
1145  */
1146  if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1147  {
1148  newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1149  state->growmemtuples = false; /* can't grow any more */
1150  }
1151 
1152  /*
1153  * We need to be sure that we do not cause LACKMEM to become true, else
1154  * the space management algorithm will go nuts. The code above should
1155  * never generate a dangerous request, but to be safe, check explicitly
1156  * that the array growth fits within availMem. (We could still cause
1157  * LACKMEM if the memory chunk overhead associated with the memtuples
1158  * array were to increase. That shouldn't happen because we chose the
1159  * initial array size large enough to ensure that palloc will be treating
1160  * both old and new arrays as separate chunks. But we'll check LACKMEM
1161  * explicitly below just in case.)
1162  */
1163  if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1164  goto noalloc;
1165 
1166  /* OK, do it */
1167  FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1168  state->memtupsize = newmemtupsize;
1169  state->memtuples = (SortTuple *)
1170  repalloc_huge(state->memtuples,
1171  state->memtupsize * sizeof(SortTuple));
1172  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1173  if (LACKMEM(state))
1174  elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1175  return true;
1176 
1177 noalloc:
1178  /* If for any reason we didn't realloc, shut off future attempts */
1179  state->growmemtuples = false;
1180  return false;
1181 }
1182 
1183 /*
1184  * Shared code for tuple and datum cases.
1185  */
1186 void
1188 {
1189  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1190 
1191  Assert(!LEADER(state));
1192 
1193  /* Count the size of the out-of-line data */
1194  if (tuple->tuple != NULL)
1196 
1197  if (!useAbbrev)
1198  {
1199  /*
1200  * Leave ordinary Datum representation, or NULL value. If there is a
1201  * converter it won't expect NULL values, and cost model is not
1202  * required to account for NULL, so in that case we avoid calling
1203  * converter and just set datum1 to zeroed representation (to be
1204  * consistent, and to support cheap inequality tests for NULL
1205  * abbreviated keys).
1206  */
1207  }
1208  else if (!consider_abort_common(state))
1209  {
1210  /* Store abbreviated key representation */
1211  tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1212  state->base.sortKeys);
1213  }
1214  else
1215  {
1216  /*
1217  * Set state to be consistent with never trying abbreviation.
1218  *
1219  * Alter datum1 representation in already-copied tuples, so as to
1220  * ensure a consistent representation (current tuple was just
1221  * handled). It does not matter if some dumped tuples are already
1222  * sorted on tape, since serialized tuples lack abbreviated keys
1223  * (TSS_BUILDRUNS state prevents control reaching here in any case).
1224  */
1225  REMOVEABBREV(state, state->memtuples, state->memtupcount);
1226  }
1227 
1228  switch (state->status)
1229  {
1230  case TSS_INITIAL:
1231 
1232  /*
1233  * Save the tuple into the unsorted array. First, grow the array
1234  * as needed. Note that we try to grow the array when there is
1235  * still one free slot remaining --- if we fail, there'll still be
1236  * room to store the incoming tuple, and then we'll switch to
1237  * tape-based operation.
1238  */
1239  if (state->memtupcount >= state->memtupsize - 1)
1240  {
1241  (void) grow_memtuples(state);
1242  Assert(state->memtupcount < state->memtupsize);
1243  }
1244  state->memtuples[state->memtupcount++] = *tuple;
1245 
1246  /*
1247  * Check if it's time to switch over to a bounded heapsort. We do
1248  * so if the input tuple count exceeds twice the desired tuple
1249  * count (this is a heuristic for where heapsort becomes cheaper
1250  * than a quicksort), or if we've just filled workMem and have
1251  * enough tuples to meet the bound.
1252  *
1253  * Note that once we enter TSS_BOUNDED state we will always try to
1254  * complete the sort that way. In the worst case, if later input
1255  * tuples are larger than earlier ones, this might cause us to
1256  * exceed workMem significantly.
1257  */
1258  if (state->bounded &&
1259  (state->memtupcount > state->bound * 2 ||
1260  (state->memtupcount > state->bound && LACKMEM(state))))
1261  {
1262 #ifdef TRACE_SORT
1263  if (trace_sort)
1264  elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1265  state->memtupcount,
1266  pg_rusage_show(&state->ru_start));
1267 #endif
1269  MemoryContextSwitchTo(oldcontext);
1270  return;
1271  }
1272 
1273  /*
1274  * Done if we still fit in available memory and have array slots.
1275  */
1276  if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1277  {
1278  MemoryContextSwitchTo(oldcontext);
1279  return;
1280  }
1281 
1282  /*
1283  * Nope; time to switch to tape-based operation.
1284  */
1285  inittapes(state, true);
1286 
1287  /*
1288  * Dump all tuples.
1289  */
1290  dumptuples(state, false);
1291  break;
1292 
1293  case TSS_BOUNDED:
1294 
1295  /*
1296  * We don't want to grow the array here, so check whether the new
1297  * tuple can be discarded before putting it in. This should be a
1298  * good speed optimization, too, since when there are many more
1299  * input tuples than the bound, most input tuples can be discarded
1300  * with just this one comparison. Note that because we currently
1301  * have the sort direction reversed, we must check for <= not >=.
1302  */
1303  if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1304  {
1305  /* new tuple <= top of the heap, so we can discard it */
1306  free_sort_tuple(state, tuple);
1308  }
1309  else
1310  {
1311  /* discard top of heap, replacing it with the new tuple */
1312  free_sort_tuple(state, &state->memtuples[0]);
1314  }
1315  break;
1316 
1317  case TSS_BUILDRUNS:
1318 
1319  /*
1320  * Save the tuple into the unsorted array (there must be space)
1321  */
1322  state->memtuples[state->memtupcount++] = *tuple;
1323 
1324  /*
1325  * If we are over the memory limit, dump all tuples.
1326  */
1327  dumptuples(state, false);
1328  break;
1329 
1330  default:
1331  elog(ERROR, "invalid tuplesort state");
1332  break;
1333  }
1334  MemoryContextSwitchTo(oldcontext);
1335 }
1336 
1337 static bool
1339 {
1340  Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1341  Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1342  Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1343 
1344  /*
1345  * Check effectiveness of abbreviation optimization. Consider aborting
1346  * when still within memory limit.
1347  */
1348  if (state->status == TSS_INITIAL &&
1349  state->memtupcount >= state->abbrevNext)
1350  {
1351  state->abbrevNext *= 2;
1352 
1353  /*
1354  * Check opclass-supplied abbreviation abort routine. It may indicate
1355  * that abbreviation should not proceed.
1356  */
1357  if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1358  state->base.sortKeys))
1359  return false;
1360 
1361  /*
1362  * Finally, restore authoritative comparator, and indicate that
1363  * abbreviation is not in play by setting abbrev_converter to NULL
1364  */
1365  state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1366  state->base.sortKeys[0].abbrev_converter = NULL;
1367  /* Not strictly necessary, but be tidy */
1368  state->base.sortKeys[0].abbrev_abort = NULL;
1369  state->base.sortKeys[0].abbrev_full_comparator = NULL;
1370 
1371  /* Give up - expect original pass-by-value representation */
1372  return true;
1373  }
1374 
1375  return false;
1376 }
1377 
1378 /*
1379  * All tuples have been provided; finish the sort.
1380  */
1381 void
1383 {
1384  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1385 
1386 #ifdef TRACE_SORT
1387  if (trace_sort)
1388  elog(LOG, "performsort of worker %d starting: %s",
1389  state->worker, pg_rusage_show(&state->ru_start));
1390 #endif
1391 
1392  switch (state->status)
1393  {
1394  case TSS_INITIAL:
1395 
1396  /*
1397  * We were able to accumulate all the tuples within the allowed
1398  * amount of memory, or leader to take over worker tapes
1399  */
1400  if (SERIAL(state))
1401  {
1402  /* Just qsort 'em and we're done */
1404  state->status = TSS_SORTEDINMEM;
1405  }
1406  else if (WORKER(state))
1407  {
1408  /*
1409  * Parallel workers must still dump out tuples to tape. No
1410  * merge is required to produce single output run, though.
1411  */
1412  inittapes(state, false);
1413  dumptuples(state, true);
1415  state->status = TSS_SORTEDONTAPE;
1416  }
1417  else
1418  {
1419  /*
1420  * Leader will take over worker tapes and merge worker runs.
1421  * Note that mergeruns sets the correct state->status.
1422  */
1424  mergeruns(state);
1425  }
1426  state->current = 0;
1427  state->eof_reached = false;
1428  state->markpos_block = 0L;
1429  state->markpos_offset = 0;
1430  state->markpos_eof = false;
1431  break;
1432 
1433  case TSS_BOUNDED:
1434 
1435  /*
1436  * We were able to accumulate all the tuples required for output
1437  * in memory, using a heap to eliminate excess tuples. Now we
1438  * have to transform the heap to a properly-sorted array. Note
1439  * that sort_bounded_heap sets the correct state->status.
1440  */
1442  state->current = 0;
1443  state->eof_reached = false;
1444  state->markpos_offset = 0;
1445  state->markpos_eof = false;
1446  break;
1447 
1448  case TSS_BUILDRUNS:
1449 
1450  /*
1451  * Finish tape-based sort. First, flush all tuples remaining in
1452  * memory out to tape; then merge until we have a single remaining
1453  * run (or, if !randomAccess and !WORKER(), one run per tape).
1454  * Note that mergeruns sets the correct state->status.
1455  */
1456  dumptuples(state, true);
1457  mergeruns(state);
1458  state->eof_reached = false;
1459  state->markpos_block = 0L;
1460  state->markpos_offset = 0;
1461  state->markpos_eof = false;
1462  break;
1463 
1464  default:
1465  elog(ERROR, "invalid tuplesort state");
1466  break;
1467  }
1468 
1469 #ifdef TRACE_SORT
1470  if (trace_sort)
1471  {
1472  if (state->status == TSS_FINALMERGE)
1473  elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1474  state->worker, state->nInputTapes,
1475  pg_rusage_show(&state->ru_start));
1476  else
1477  elog(LOG, "performsort of worker %d done: %s",
1478  state->worker, pg_rusage_show(&state->ru_start));
1479  }
1480 #endif
1481 
1482  MemoryContextSwitchTo(oldcontext);
1483 }
1484 
1485 /*
1486  * Internal routine to fetch the next tuple in either forward or back
1487  * direction into *stup. Returns false if no more tuples.
1488  * Returned tuple belongs to tuplesort memory context, and must not be freed
1489  * by caller. Note that fetched tuple is stored in memory that may be
1490  * recycled by any future fetch.
1491  */
1492 bool
1494  SortTuple *stup)
1495 {
1496  unsigned int tuplen;
1497  size_t nmoved;
1498 
1499  Assert(!WORKER(state));
1500 
1501  switch (state->status)
1502  {
1503  case TSS_SORTEDINMEM:
1504  Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1505  Assert(!state->slabAllocatorUsed);
1506  if (forward)
1507  {
1508  if (state->current < state->memtupcount)
1509  {
1510  *stup = state->memtuples[state->current++];
1511  return true;
1512  }
1513  state->eof_reached = true;
1514 
1515  /*
1516  * Complain if caller tries to retrieve more tuples than
1517  * originally asked for in a bounded sort. This is because
1518  * returning EOF here might be the wrong thing.
1519  */
1520  if (state->bounded && state->current >= state->bound)
1521  elog(ERROR, "retrieved too many tuples in a bounded sort");
1522 
1523  return false;
1524  }
1525  else
1526  {
1527  if (state->current <= 0)
1528  return false;
1529 
1530  /*
1531  * if all tuples are fetched already then we return last
1532  * tuple, else - tuple before last returned.
1533  */
1534  if (state->eof_reached)
1535  state->eof_reached = false;
1536  else
1537  {
1538  state->current--; /* last returned tuple */
1539  if (state->current <= 0)
1540  return false;
1541  }
1542  *stup = state->memtuples[state->current - 1];
1543  return true;
1544  }
1545  break;
1546 
1547  case TSS_SORTEDONTAPE:
1548  Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1549  Assert(state->slabAllocatorUsed);
1550 
1551  /*
1552  * The slot that held the tuple that we returned in previous
1553  * gettuple call can now be reused.
1554  */
1555  if (state->lastReturnedTuple)
1556  {
1557  RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1558  state->lastReturnedTuple = NULL;
1559  }
1560 
1561  if (forward)
1562  {
1563  if (state->eof_reached)
1564  return false;
1565 
1566  if ((tuplen = getlen(state->result_tape, true)) != 0)
1567  {
1568  READTUP(state, stup, state->result_tape, tuplen);
1569 
1570  /*
1571  * Remember the tuple we return, so that we can recycle
1572  * its memory on next call. (This can be NULL, in the
1573  * !state->tuples case).
1574  */
1575  state->lastReturnedTuple = stup->tuple;
1576 
1577  return true;
1578  }
1579  else
1580  {
1581  state->eof_reached = true;
1582  return false;
1583  }
1584  }
1585 
1586  /*
1587  * Backward.
1588  *
1589  * if all tuples are fetched already then we return last tuple,
1590  * else - tuple before last returned.
1591  */
1592  if (state->eof_reached)
1593  {
1594  /*
1595  * Seek position is pointing just past the zero tuplen at the
1596  * end of file; back up to fetch last tuple's ending length
1597  * word. If seek fails we must have a completely empty file.
1598  */
1599  nmoved = LogicalTapeBackspace(state->result_tape,
1600  2 * sizeof(unsigned int));
1601  if (nmoved == 0)
1602  return false;
1603  else if (nmoved != 2 * sizeof(unsigned int))
1604  elog(ERROR, "unexpected tape position");
1605  state->eof_reached = false;
1606  }
1607  else
1608  {
1609  /*
1610  * Back up and fetch previously-returned tuple's ending length
1611  * word. If seek fails, assume we are at start of file.
1612  */
1613  nmoved = LogicalTapeBackspace(state->result_tape,
1614  sizeof(unsigned int));
1615  if (nmoved == 0)
1616  return false;
1617  else if (nmoved != sizeof(unsigned int))
1618  elog(ERROR, "unexpected tape position");
1619  tuplen = getlen(state->result_tape, false);
1620 
1621  /*
1622  * Back up to get ending length word of tuple before it.
1623  */
1624  nmoved = LogicalTapeBackspace(state->result_tape,
1625  tuplen + 2 * sizeof(unsigned int));
1626  if (nmoved == tuplen + sizeof(unsigned int))
1627  {
1628  /*
1629  * We backed up over the previous tuple, but there was no
1630  * ending length word before it. That means that the prev
1631  * tuple is the first tuple in the file. It is now the
1632  * next to read in forward direction (not obviously right,
1633  * but that is what in-memory case does).
1634  */
1635  return false;
1636  }
1637  else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1638  elog(ERROR, "bogus tuple length in backward scan");
1639  }
1640 
1641  tuplen = getlen(state->result_tape, false);
1642 
1643  /*
1644  * Now we have the length of the prior tuple, back up and read it.
1645  * Note: READTUP expects we are positioned after the initial
1646  * length word of the tuple, so back up to that point.
1647  */
1648  nmoved = LogicalTapeBackspace(state->result_tape,
1649  tuplen);
1650  if (nmoved != tuplen)
1651  elog(ERROR, "bogus tuple length in backward scan");
1652  READTUP(state, stup, state->result_tape, tuplen);
1653 
1654  /*
1655  * Remember the tuple we return, so that we can recycle its memory
1656  * on next call. (This can be NULL, in the Datum case).
1657  */
1658  state->lastReturnedTuple = stup->tuple;
1659 
1660  return true;
1661 
1662  case TSS_FINALMERGE:
1663  Assert(forward);
1664  /* We are managing memory ourselves, with the slab allocator. */
1665  Assert(state->slabAllocatorUsed);
1666 
1667  /*
1668  * The slab slot holding the tuple that we returned in previous
1669  * gettuple call can now be reused.
1670  */
1671  if (state->lastReturnedTuple)
1672  {
1673  RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1674  state->lastReturnedTuple = NULL;
1675  }
1676 
1677  /*
1678  * This code should match the inner loop of mergeonerun().
1679  */
1680  if (state->memtupcount > 0)
1681  {
1682  int srcTapeIndex = state->memtuples[0].srctape;
1683  LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1684  SortTuple newtup;
1685 
1686  *stup = state->memtuples[0];
1687 
1688  /*
1689  * Remember the tuple we return, so that we can recycle its
1690  * memory on next call. (This can be NULL, in the Datum case).
1691  */
1692  state->lastReturnedTuple = stup->tuple;
1693 
1694  /*
1695  * Pull next tuple from tape, and replace the returned tuple
1696  * at top of the heap with it.
1697  */
1698  if (!mergereadnext(state, srcTape, &newtup))
1699  {
1700  /*
1701  * If no more data, we've reached end of run on this tape.
1702  * Remove the top node from the heap.
1703  */
1705  state->nInputRuns--;
1706 
1707  /*
1708  * Close the tape. It'd go away at the end of the sort
1709  * anyway, but better to release the memory early.
1710  */
1711  LogicalTapeClose(srcTape);
1712  return true;
1713  }
1714  newtup.srctape = srcTapeIndex;
1716  return true;
1717  }
1718  return false;
1719 
1720  default:
1721  elog(ERROR, "invalid tuplesort state");
1722  return false; /* keep compiler quiet */
1723  }
1724 }
1725 
1726 
1727 /*
1728  * Advance over N tuples in either forward or back direction,
1729  * without returning any data. N==0 is a no-op.
1730  * Returns true if successful, false if ran out of tuples.
1731  */
1732 bool
1733 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1734 {
1735  MemoryContext oldcontext;
1736 
1737  /*
1738  * We don't actually support backwards skip yet, because no callers need
1739  * it. The API is designed to allow for that later, though.
1740  */
1741  Assert(forward);
1742  Assert(ntuples >= 0);
1743  Assert(!WORKER(state));
1744 
1745  switch (state->status)
1746  {
1747  case TSS_SORTEDINMEM:
1748  if (state->memtupcount - state->current >= ntuples)
1749  {
1750  state->current += ntuples;
1751  return true;
1752  }
1753  state->current = state->memtupcount;
1754  state->eof_reached = true;
1755 
1756  /*
1757  * Complain if caller tries to retrieve more tuples than
1758  * originally asked for in a bounded sort. This is because
1759  * returning EOF here might be the wrong thing.
1760  */
1761  if (state->bounded && state->current >= state->bound)
1762  elog(ERROR, "retrieved too many tuples in a bounded sort");
1763 
1764  return false;
1765 
1766  case TSS_SORTEDONTAPE:
1767  case TSS_FINALMERGE:
1768 
1769  /*
1770  * We could probably optimize these cases better, but for now it's
1771  * not worth the trouble.
1772  */
1773  oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1774  while (ntuples-- > 0)
1775  {
1776  SortTuple stup;
1777 
1778  if (!tuplesort_gettuple_common(state, forward, &stup))
1779  {
1780  MemoryContextSwitchTo(oldcontext);
1781  return false;
1782  }
1784  }
1785  MemoryContextSwitchTo(oldcontext);
1786  return true;
1787 
1788  default:
1789  elog(ERROR, "invalid tuplesort state");
1790  return false; /* keep compiler quiet */
1791  }
1792 }
1793 
1794 /*
1795  * tuplesort_merge_order - report merge order we'll use for given memory
1796  * (note: "merge order" just means the number of input tapes in the merge).
1797  *
1798  * This is exported for use by the planner. allowedMem is in bytes.
1799  */
1800 int
1801 tuplesort_merge_order(int64 allowedMem)
1802 {
1803  int mOrder;
1804 
1805  /*----------
1806  * In the merge phase, we need buffer space for each input and output tape.
1807  * Each pass in the balanced merge algorithm reads from M input tapes, and
1808  * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1809  * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1810  * input tape.
1811  *
1812  * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1813  * N * TAPE_BUFFER_OVERHEAD
1814  *
1815  * Except for the last and next-to-last merge passes, where there can be
1816  * fewer tapes left to process, M = N. We choose M so that we have the
1817  * desired amount of memory available for the input buffers
1818  * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1819  * available for the tape buffers (allowedMem).
1820  *
1821  * Note: you might be thinking we need to account for the memtuples[]
1822  * array in this calculation, but we effectively treat that as part of the
1823  * MERGE_BUFFER_SIZE workspace.
1824  *----------
1825  */
1826  mOrder = allowedMem /
1828 
1829  /*
1830  * Even in minimum memory, use at least a MINORDER merge. On the other
1831  * hand, even when we have lots of memory, do not use more than a MAXORDER
1832  * merge. Tapes are pretty cheap, but they're not entirely free. Each
1833  * additional tape reduces the amount of memory available to build runs,
1834  * which in turn can cause the same sort to need more runs, which makes
1835  * merging slower even if it can still be done in a single pass. Also,
1836  * high order merges are quite slow due to CPU cache effects; it can be
1837  * faster to pay the I/O cost of a multi-pass merge than to perform a
1838  * single merge pass across many hundreds of tapes.
1839  */
1840  mOrder = Max(mOrder, MINORDER);
1841  mOrder = Min(mOrder, MAXORDER);
1842 
1843  return mOrder;
1844 }
1845 
1846 /*
1847  * Helper function to calculate how much memory to allocate for the read buffer
1848  * of each input tape in a merge pass.
1849  *
1850  * 'avail_mem' is the amount of memory available for the buffers of all the
1851  * tapes, both input and output.
1852  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1853  * 'maxOutputTapes' is the max. number of output tapes we should produce.
1854  */
1855 static int64
1856 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1857  int maxOutputTapes)
1858 {
1859  int nOutputRuns;
1860  int nOutputTapes;
1861 
1862  /*
1863  * How many output tapes will we produce in this pass?
1864  *
1865  * This is nInputRuns / nInputTapes, rounded up.
1866  */
1867  nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1868 
1869  nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1870 
1871  /*
1872  * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1873  * remaining memory is divided evenly between the input tapes.
1874  *
1875  * This also follows from the formula in tuplesort_merge_order, but here
1876  * we derive the input buffer size from the amount of memory available,
1877  * and M and N.
1878  */
1879  return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1880 }
1881 
1882 /*
1883  * inittapes - initialize for tape sorting.
1884  *
1885  * This is called only if we have found we won't sort in memory.
1886  */
1887 static void
1889 {
1890  Assert(!LEADER(state));
1891 
1892  if (mergeruns)
1893  {
1894  /* Compute number of input tapes to use when merging */
1895  state->maxTapes = tuplesort_merge_order(state->allowedMem);
1896  }
1897  else
1898  {
1899  /* Workers can sometimes produce single run, output without merge */
1900  Assert(WORKER(state));
1901  state->maxTapes = MINORDER;
1902  }
1903 
1904 #ifdef TRACE_SORT
1905  if (trace_sort)
1906  elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1907  state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1908 #endif
1909 
1910  /* Create the tape set */
1911  inittapestate(state, state->maxTapes);
1912  state->tapeset =
1913  LogicalTapeSetCreate(false,
1914  state->shared ? &state->shared->fileset : NULL,
1915  state->worker);
1916 
1917  state->currentRun = 0;
1918 
1919  /*
1920  * Initialize logical tape arrays.
1921  */
1922  state->inputTapes = NULL;
1923  state->nInputTapes = 0;
1924  state->nInputRuns = 0;
1925 
1926  state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1927  state->nOutputTapes = 0;
1928  state->nOutputRuns = 0;
1929 
1930  state->status = TSS_BUILDRUNS;
1931 
1933 }
1934 
1935 /*
1936  * inittapestate - initialize generic tape management state
1937  */
1938 static void
1940 {
1941  int64 tapeSpace;
1942 
1943  /*
1944  * Decrease availMem to reflect the space needed for tape buffers; but
1945  * don't decrease it to the point that we have no room for tuples. (That
1946  * case is only likely to occur if sorting pass-by-value Datums; in all
1947  * other scenarios the memtuples[] array is unlikely to occupy more than
1948  * half of allowedMem. In the pass-by-value case it's not important to
1949  * account for tuple space, so we don't care if LACKMEM becomes
1950  * inaccurate.)
1951  */
1952  tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1953 
1954  if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1955  USEMEM(state, tapeSpace);
1956 
1957  /*
1958  * Make sure that the temp file(s) underlying the tape set are created in
1959  * suitable temp tablespaces. For parallel sorts, this should have been
1960  * called already, but it doesn't matter if it is called a second time.
1961  */
1963 }
1964 
1965 /*
1966  * selectnewtape -- select next tape to output to.
1967  *
1968  * This is called after finishing a run when we know another run
1969  * must be started. This is used both when building the initial
1970  * runs, and during merge passes.
1971  */
1972 static void
1974 {
1975  /*
1976  * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1977  * both zero. On each call, we create a new output tape to hold the next
1978  * run, until maxTapes is reached. After that, we assign new runs to the
1979  * existing tapes in a round robin fashion.
1980  */
1981  if (state->nOutputTapes < state->maxTapes)
1982  {
1983  /* Create a new tape to hold the next run */
1984  Assert(state->outputTapes[state->nOutputRuns] == NULL);
1985  Assert(state->nOutputRuns == state->nOutputTapes);
1986  state->destTape = LogicalTapeCreate(state->tapeset);
1987  state->outputTapes[state->nOutputTapes] = state->destTape;
1988  state->nOutputTapes++;
1989  state->nOutputRuns++;
1990  }
1991  else
1992  {
1993  /*
1994  * We have reached the max number of tapes. Append to an existing
1995  * tape.
1996  */
1997  state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
1998  state->nOutputRuns++;
1999  }
2000 }
2001 
2002 /*
2003  * Initialize the slab allocation arena, for the given number of slots.
2004  */
2005 static void
2007 {
2008  if (numSlots > 0)
2009  {
2010  char *p;
2011  int i;
2012 
2013  state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2014  state->slabMemoryEnd = state->slabMemoryBegin +
2015  numSlots * SLAB_SLOT_SIZE;
2016  state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2017  USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2018 
2019  p = state->slabMemoryBegin;
2020  for (i = 0; i < numSlots - 1; i++)
2021  {
2022  ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2023  p += SLAB_SLOT_SIZE;
2024  }
2025  ((SlabSlot *) p)->nextfree = NULL;
2026  }
2027  else
2028  {
2029  state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2030  state->slabFreeHead = NULL;
2031  }
2032  state->slabAllocatorUsed = true;
2033 }
2034 
2035 /*
2036  * mergeruns -- merge all the completed initial runs.
2037  *
2038  * This implements the Balanced k-Way Merge Algorithm. All input data has
2039  * already been written to initial runs on tape (see dumptuples).
2040  */
2041 static void
2043 {
2044  int tapenum;
2045 
2046  Assert(state->status == TSS_BUILDRUNS);
2047  Assert(state->memtupcount == 0);
2048 
2049  if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2050  {
2051  /*
2052  * If there are multiple runs to be merged, when we go to read back
2053  * tuples from disk, abbreviated keys will not have been stored, and
2054  * we don't care to regenerate them. Disable abbreviation from this
2055  * point on.
2056  */
2057  state->base.sortKeys->abbrev_converter = NULL;
2058  state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2059 
2060  /* Not strictly necessary, but be tidy */
2061  state->base.sortKeys->abbrev_abort = NULL;
2062  state->base.sortKeys->abbrev_full_comparator = NULL;
2063  }
2064 
2065  /*
2066  * Reset tuple memory. We've freed all the tuples that we previously
2067  * allocated. We will use the slab allocator from now on.
2068  */
2069  MemoryContextResetOnly(state->base.tuplecontext);
2070 
2071  /*
2072  * We no longer need a large memtuples array. (We will allocate a smaller
2073  * one for the heap later.)
2074  */
2075  FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2076  pfree(state->memtuples);
2077  state->memtuples = NULL;
2078 
2079  /*
2080  * Initialize the slab allocator. We need one slab slot per input tape,
2081  * for the tuples in the heap, plus one to hold the tuple last returned
2082  * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2083  * however, we don't need to do allocate anything.)
2084  *
2085  * In a multi-pass merge, we could shrink this allocation for the last
2086  * merge pass, if it has fewer tapes than previous passes, but we don't
2087  * bother.
2088  *
2089  * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2090  * to track memory usage of individual tuples.
2091  */
2092  if (state->base.tuples)
2093  init_slab_allocator(state, state->nOutputTapes + 1);
2094  else
2096 
2097  /*
2098  * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2099  * from each input tape.
2100  *
2101  * We could shrink this, too, between passes in a multi-pass merge, but we
2102  * don't bother. (The initial input tapes are still in outputTapes. The
2103  * number of input tapes will not increase between passes.)
2104  */
2105  state->memtupsize = state->nOutputTapes;
2106  state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2107  state->nOutputTapes * sizeof(SortTuple));
2108  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2109 
2110  /*
2111  * Use all the remaining memory we have available for tape buffers among
2112  * all the input tapes. At the beginning of each merge pass, we will
2113  * divide this memory between the input and output tapes in the pass.
2114  */
2115  state->tape_buffer_mem = state->availMem;
2116  USEMEM(state, state->tape_buffer_mem);
2117 #ifdef TRACE_SORT
2118  if (trace_sort)
2119  elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2120  state->worker, state->tape_buffer_mem / 1024);
2121 #endif
2122 
2123  for (;;)
2124  {
2125  /*
2126  * On the first iteration, or if we have read all the runs from the
2127  * input tapes in a multi-pass merge, it's time to start a new pass.
2128  * Rewind all the output tapes, and make them inputs for the next
2129  * pass.
2130  */
2131  if (state->nInputRuns == 0)
2132  {
2133  int64 input_buffer_size;
2134 
2135  /* Close the old, emptied, input tapes */
2136  if (state->nInputTapes > 0)
2137  {
2138  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2139  LogicalTapeClose(state->inputTapes[tapenum]);
2140  pfree(state->inputTapes);
2141  }
2142 
2143  /* Previous pass's outputs become next pass's inputs. */
2144  state->inputTapes = state->outputTapes;
2145  state->nInputTapes = state->nOutputTapes;
2146  state->nInputRuns = state->nOutputRuns;
2147 
2148  /*
2149  * Reset output tape variables. The actual LogicalTapes will be
2150  * created as needed, here we only allocate the array to hold
2151  * them.
2152  */
2153  state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2154  state->nOutputTapes = 0;
2155  state->nOutputRuns = 0;
2156 
2157  /*
2158  * Redistribute the memory allocated for tape buffers, among the
2159  * new input and output tapes.
2160  */
2161  input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2162  state->nInputTapes,
2163  state->nInputRuns,
2164  state->maxTapes);
2165 
2166 #ifdef TRACE_SORT
2167  if (trace_sort)
2168  elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2169  state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2170  pg_rusage_show(&state->ru_start));
2171 #endif
2172 
2173  /* Prepare the new input tapes for merge pass. */
2174  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2175  LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2176 
2177  /*
2178  * If there's just one run left on each input tape, then only one
2179  * merge pass remains. If we don't have to produce a materialized
2180  * sorted tape, we can stop at this point and do the final merge
2181  * on-the-fly.
2182  */
2183  if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2184  && state->nInputRuns <= state->nInputTapes
2185  && !WORKER(state))
2186  {
2187  /* Tell logtape.c we won't be writing anymore */
2189  /* Initialize for the final merge pass */
2190  beginmerge(state);
2191  state->status = TSS_FINALMERGE;
2192  return;
2193  }
2194  }
2195 
2196  /* Select an output tape */
2198 
2199  /* Merge one run from each input tape. */
2200  mergeonerun(state);
2201 
2202  /*
2203  * If the input tapes are empty, and we output only one output run,
2204  * we're done. The current output tape contains the final result.
2205  */
2206  if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2207  break;
2208  }
2209 
2210  /*
2211  * Done. The result is on a single run on a single tape.
2212  */
2213  state->result_tape = state->outputTapes[0];
2214  if (!WORKER(state))
2215  LogicalTapeFreeze(state->result_tape, NULL);
2216  else
2218  state->status = TSS_SORTEDONTAPE;
2219 
2220  /* Close all the now-empty input tapes, to release their read buffers. */
2221  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2222  LogicalTapeClose(state->inputTapes[tapenum]);
2223 }
2224 
2225 /*
2226  * Merge one run from each input tape.
2227  */
2228 static void
2230 {
2231  int srcTapeIndex;
2232  LogicalTape *srcTape;
2233 
2234  /*
2235  * Start the merge by loading one tuple from each active source tape into
2236  * the heap.
2237  */
2238  beginmerge(state);
2239 
2240  Assert(state->slabAllocatorUsed);
2241 
2242  /*
2243  * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2244  * out, and replacing it with next tuple from same tape (if there is
2245  * another one).
2246  */
2247  while (state->memtupcount > 0)
2248  {
2249  SortTuple stup;
2250 
2251  /* write the tuple to destTape */
2252  srcTapeIndex = state->memtuples[0].srctape;
2253  srcTape = state->inputTapes[srcTapeIndex];
2254  WRITETUP(state, state->destTape, &state->memtuples[0]);
2255 
2256  /* recycle the slot of the tuple we just wrote out, for the next read */
2257  if (state->memtuples[0].tuple)
2258  RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2259 
2260  /*
2261  * pull next tuple from the tape, and replace the written-out tuple in
2262  * the heap with it.
2263  */
2264  if (mergereadnext(state, srcTape, &stup))
2265  {
2266  stup.srctape = srcTapeIndex;
2268  }
2269  else
2270  {
2272  state->nInputRuns--;
2273  }
2274  }
2275 
2276  /*
2277  * When the heap empties, we're done. Write an end-of-run marker on the
2278  * output tape.
2279  */
2280  markrunend(state->destTape);
2281 }
2282 
2283 /*
2284  * beginmerge - initialize for a merge pass
2285  *
2286  * Fill the merge heap with the first tuple from each input tape.
2287  */
2288 static void
2290 {
2291  int activeTapes;
2292  int srcTapeIndex;
2293 
2294  /* Heap should be empty here */
2295  Assert(state->memtupcount == 0);
2296 
2297  activeTapes = Min(state->nInputTapes, state->nInputRuns);
2298 
2299  for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2300  {
2301  SortTuple tup;
2302 
2303  if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2304  {
2305  tup.srctape = srcTapeIndex;
2307  }
2308  }
2309 }
2310 
2311 /*
2312  * mergereadnext - read next tuple from one merge input tape
2313  *
2314  * Returns false on EOF.
2315  */
2316 static bool
2318 {
2319  unsigned int tuplen;
2320 
2321  /* read next tuple, if any */
2322  if ((tuplen = getlen(srcTape, true)) == 0)
2323  return false;
2324  READTUP(state, stup, srcTape, tuplen);
2325 
2326  return true;
2327 }
2328 
2329 /*
2330  * dumptuples - remove tuples from memtuples and write initial run to tape
2331  *
2332  * When alltuples = true, dump everything currently in memory. (This case is
2333  * only used at end of input data.)
2334  */
2335 static void
2337 {
2338  int memtupwrite;
2339  int i;
2340 
2341  /*
2342  * Nothing to do if we still fit in available memory and have array slots,
2343  * unless this is the final call during initial run generation.
2344  */
2345  if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2346  !alltuples)
2347  return;
2348 
2349  /*
2350  * Final call might require no sorting, in rare cases where we just so
2351  * happen to have previously LACKMEM()'d at the point where exactly all
2352  * remaining tuples are loaded into memory, just before input was
2353  * exhausted. In general, short final runs are quite possible, but avoid
2354  * creating a completely empty run. In a worker, though, we must produce
2355  * at least one tape, even if it's empty.
2356  */
2357  if (state->memtupcount == 0 && state->currentRun > 0)
2358  return;
2359 
2360  Assert(state->status == TSS_BUILDRUNS);
2361 
2362  /*
2363  * It seems unlikely that this limit will ever be exceeded, but take no
2364  * chances
2365  */
2366  if (state->currentRun == INT_MAX)
2367  ereport(ERROR,
2368  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2369  errmsg("cannot have more than %d runs for an external sort",
2370  INT_MAX)));
2371 
2372  if (state->currentRun > 0)
2374 
2375  state->currentRun++;
2376 
2377 #ifdef TRACE_SORT
2378  if (trace_sort)
2379  elog(LOG, "worker %d starting quicksort of run %d: %s",
2380  state->worker, state->currentRun,
2381  pg_rusage_show(&state->ru_start));
2382 #endif
2383 
2384  /*
2385  * Sort all tuples accumulated within the allowed amount of memory for
2386  * this run using quicksort
2387  */
2389 
2390 #ifdef TRACE_SORT
2391  if (trace_sort)
2392  elog(LOG, "worker %d finished quicksort of run %d: %s",
2393  state->worker, state->currentRun,
2394  pg_rusage_show(&state->ru_start));
2395 #endif
2396 
2397  memtupwrite = state->memtupcount;
2398  for (i = 0; i < memtupwrite; i++)
2399  {
2400  SortTuple *stup = &state->memtuples[i];
2401 
2402  WRITETUP(state, state->destTape, stup);
2403 
2404  /*
2405  * Account for freeing the tuple, but no need to do the actual pfree
2406  * since the tuplecontext is being reset after the loop.
2407  */
2408  if (stup->tuple != NULL)
2410  }
2411 
2412  state->memtupcount = 0;
2413 
2414  /*
2415  * Reset tuple memory. We've freed all of the tuples that we previously
2416  * allocated. It's important to avoid fragmentation when there is a stark
2417  * change in the sizes of incoming tuples. Fragmentation due to
2418  * AllocSetFree's bucketing by size class might be particularly bad if
2419  * this step wasn't taken.
2420  */
2421  MemoryContextReset(state->base.tuplecontext);
2422 
2423  markrunend(state->destTape);
2424 
2425 #ifdef TRACE_SORT
2426  if (trace_sort)
2427  elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2428  state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2429  pg_rusage_show(&state->ru_start));
2430 #endif
2431 }
2432 
2433 /*
2434  * tuplesort_rescan - rewind and replay the scan
2435  */
2436 void
2438 {
2439  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2440 
2441  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2442 
2443  switch (state->status)
2444  {
2445  case TSS_SORTEDINMEM:
2446  state->current = 0;
2447  state->eof_reached = false;
2448  state->markpos_offset = 0;
2449  state->markpos_eof = false;
2450  break;
2451  case TSS_SORTEDONTAPE:
2452  LogicalTapeRewindForRead(state->result_tape, 0);
2453  state->eof_reached = false;
2454  state->markpos_block = 0L;
2455  state->markpos_offset = 0;
2456  state->markpos_eof = false;
2457  break;
2458  default:
2459  elog(ERROR, "invalid tuplesort state");
2460  break;
2461  }
2462 
2463  MemoryContextSwitchTo(oldcontext);
2464 }
2465 
2466 /*
2467  * tuplesort_markpos - saves current position in the merged sort file
2468  */
2469 void
2471 {
2472  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2473 
2474  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2475 
2476  switch (state->status)
2477  {
2478  case TSS_SORTEDINMEM:
2479  state->markpos_offset = state->current;
2480  state->markpos_eof = state->eof_reached;
2481  break;
2482  case TSS_SORTEDONTAPE:
2483  LogicalTapeTell(state->result_tape,
2484  &state->markpos_block,
2485  &state->markpos_offset);
2486  state->markpos_eof = state->eof_reached;
2487  break;
2488  default:
2489  elog(ERROR, "invalid tuplesort state");
2490  break;
2491  }
2492 
2493  MemoryContextSwitchTo(oldcontext);
2494 }
2495 
2496 /*
2497  * tuplesort_restorepos - restores current position in merged sort file to
2498  * last saved position
2499  */
2500 void
2502 {
2503  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2504 
2505  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2506 
2507  switch (state->status)
2508  {
2509  case TSS_SORTEDINMEM:
2510  state->current = state->markpos_offset;
2511  state->eof_reached = state->markpos_eof;
2512  break;
2513  case TSS_SORTEDONTAPE:
2514  LogicalTapeSeek(state->result_tape,
2515  state->markpos_block,
2516  state->markpos_offset);
2517  state->eof_reached = state->markpos_eof;
2518  break;
2519  default:
2520  elog(ERROR, "invalid tuplesort state");
2521  break;
2522  }
2523 
2524  MemoryContextSwitchTo(oldcontext);
2525 }
2526 
2527 /*
2528  * tuplesort_get_stats - extract summary statistics
2529  *
2530  * This can be called after tuplesort_performsort() finishes to obtain
2531  * printable summary information about how the sort was performed.
2532  */
2533 void
2535  TuplesortInstrumentation *stats)
2536 {
2537  /*
2538  * Note: it might seem we should provide both memory and disk usage for a
2539  * disk-based sort. However, the current code doesn't track memory space
2540  * accurately once we have begun to return tuples to the caller (since we
2541  * don't account for pfree's the caller is expected to do), so we cannot
2542  * rely on availMem in a disk sort. This does not seem worth the overhead
2543  * to fix. Is it worth creating an API for the memory context code to
2544  * tell us how much is actually used in sortcontext?
2545  */
2547 
2548  if (state->isMaxSpaceDisk)
2550  else
2552  stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2553 
2554  switch (state->maxSpaceStatus)
2555  {
2556  case TSS_SORTEDINMEM:
2557  if (state->boundUsed)
2559  else
2561  break;
2562  case TSS_SORTEDONTAPE:
2564  break;
2565  case TSS_FINALMERGE:
2567  break;
2568  default:
2570  break;
2571  }
2572 }
2573 
2574 /*
2575  * Convert TuplesortMethod to a string.
2576  */
2577 const char *
2579 {
2580  switch (m)
2581  {
2583  return "still in progress";
2585  return "top-N heapsort";
2586  case SORT_TYPE_QUICKSORT:
2587  return "quicksort";
2589  return "external sort";
2591  return "external merge";
2592  }
2593 
2594  return "unknown";
2595 }
2596 
2597 /*
2598  * Convert TuplesortSpaceType to a string.
2599  */
2600 const char *
2602 {
2604  return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2605 }
2606 
2607 
2608 /*
2609  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2610  */
2611 
2612 /*
2613  * Convert the existing unordered array of SortTuples to a bounded heap,
2614  * discarding all but the smallest "state->bound" tuples.
2615  *
2616  * When working with a bounded heap, we want to keep the largest entry
2617  * at the root (array entry zero), instead of the smallest as in the normal
2618  * sort case. This allows us to discard the largest entry cheaply.
2619  * Therefore, we temporarily reverse the sort direction.
2620  */
2621 static void
2623 {
2624  int tupcount = state->memtupcount;
2625  int i;
2626 
2627  Assert(state->status == TSS_INITIAL);
2628  Assert(state->bounded);
2629  Assert(tupcount >= state->bound);
2630  Assert(SERIAL(state));
2631 
2632  /* Reverse sort direction so largest entry will be at root */
2634 
2635  state->memtupcount = 0; /* make the heap empty */
2636  for (i = 0; i < tupcount; i++)
2637  {
2638  if (state->memtupcount < state->bound)
2639  {
2640  /* Insert next tuple into heap */
2641  /* Must copy source tuple to avoid possible overwrite */
2642  SortTuple stup = state->memtuples[i];
2643 
2644  tuplesort_heap_insert(state, &stup);
2645  }
2646  else
2647  {
2648  /*
2649  * The heap is full. Replace the largest entry with the new
2650  * tuple, or just discard it, if it's larger than anything already
2651  * in the heap.
2652  */
2653  if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2654  {
2655  free_sort_tuple(state, &state->memtuples[i]);
2657  }
2658  else
2659  tuplesort_heap_replace_top(state, &state->memtuples[i]);
2660  }
2661  }
2662 
2663  Assert(state->memtupcount == state->bound);
2664  state->status = TSS_BOUNDED;
2665 }
2666 
2667 /*
2668  * Convert the bounded heap to a properly-sorted array
2669  */
2670 static void
2672 {
2673  int tupcount = state->memtupcount;
2674 
2675  Assert(state->status == TSS_BOUNDED);
2676  Assert(state->bounded);
2677  Assert(tupcount == state->bound);
2678  Assert(SERIAL(state));
2679 
2680  /*
2681  * We can unheapify in place because each delete-top call will remove the
2682  * largest entry, which we can promptly store in the newly freed slot at
2683  * the end. Once we're down to a single-entry heap, we're done.
2684  */
2685  while (state->memtupcount > 1)
2686  {
2687  SortTuple stup = state->memtuples[0];
2688 
2689  /* this sifts-up the next-largest entry and decreases memtupcount */
2691  state->memtuples[state->memtupcount] = stup;
2692  }
2693  state->memtupcount = tupcount;
2694 
2695  /*
2696  * Reverse sort direction back to the original state. This is not
2697  * actually necessary but seems like a good idea for tidiness.
2698  */
2700 
2701  state->status = TSS_SORTEDINMEM;
2702  state->boundUsed = true;
2703 }
2704 
2705 /*
2706  * Sort all memtuples using specialized qsort() routines.
2707  *
2708  * Quicksort is used for small in-memory sorts, and external sort runs.
2709  */
2710 static void
2712 {
2713  Assert(!LEADER(state));
2714 
2715  if (state->memtupcount > 1)
2716  {
2717  /*
2718  * Do we have the leading column's value or abbreviation in datum1,
2719  * and is there a specialization for its comparator?
2720  */
2721  if (state->base.haveDatum1 && state->base.sortKeys)
2722  {
2723  if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2724  {
2725  qsort_tuple_unsigned(state->memtuples,
2726  state->memtupcount,
2727  state);
2728  return;
2729  }
2730 #if SIZEOF_DATUM >= 8
2731  else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2732  {
2733  qsort_tuple_signed(state->memtuples,
2734  state->memtupcount,
2735  state);
2736  return;
2737  }
2738 #endif
2739  else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2740  {
2741  qsort_tuple_int32(state->memtuples,
2742  state->memtupcount,
2743  state);
2744  return;
2745  }
2746  }
2747 
2748  /* Can we use the single-key sort function? */
2749  if (state->base.onlyKey != NULL)
2750  {
2751  qsort_ssup(state->memtuples, state->memtupcount,
2752  state->base.onlyKey);
2753  }
2754  else
2755  {
2756  qsort_tuple(state->memtuples,
2757  state->memtupcount,
2758  state->base.comparetup,
2759  state);
2760  }
2761  }
2762 }
2763 
2764 /*
2765  * Insert a new tuple into an empty or existing heap, maintaining the
2766  * heap invariant. Caller is responsible for ensuring there's room.
2767  *
2768  * Note: For some callers, tuple points to a memtuples[] entry above the
2769  * end of the heap. This is safe as long as it's not immediately adjacent
2770  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2771  * is, it might get overwritten before being moved into the heap!
2772  */
2773 static void
2775 {
2776  SortTuple *memtuples;
2777  int j;
2778 
2779  memtuples = state->memtuples;
2780  Assert(state->memtupcount < state->memtupsize);
2781 
2783 
2784  /*
2785  * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2786  * using 1-based array indexes, not 0-based.
2787  */
2788  j = state->memtupcount++;
2789  while (j > 0)
2790  {
2791  int i = (j - 1) >> 1;
2792 
2793  if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2794  break;
2795  memtuples[j] = memtuples[i];
2796  j = i;
2797  }
2798  memtuples[j] = *tuple;
2799 }
2800 
2801 /*
2802  * Remove the tuple at state->memtuples[0] from the heap. Decrement
2803  * memtupcount, and sift up to maintain the heap invariant.
2804  *
2805  * The caller has already free'd the tuple the top node points to,
2806  * if necessary.
2807  */
2808 static void
2810 {
2811  SortTuple *memtuples = state->memtuples;
2812  SortTuple *tuple;
2813 
2814  if (--state->memtupcount <= 0)
2815  return;
2816 
2817  /*
2818  * Remove the last tuple in the heap, and re-insert it, by replacing the
2819  * current top node with it.
2820  */
2821  tuple = &memtuples[state->memtupcount];
2823 }
2824 
2825 /*
2826  * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2827  * maintain the heap invariant.
2828  *
2829  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2830  * Heapsort, steps H3-H8).
2831  */
2832 static void
2834 {
2835  SortTuple *memtuples = state->memtuples;
2836  unsigned int i,
2837  n;
2838 
2839  Assert(state->memtupcount >= 1);
2840 
2842 
2843  /*
2844  * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2845  * This prevents overflow in the "2 * i + 1" calculation, since at the top
2846  * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2847  */
2848  n = state->memtupcount;
2849  i = 0; /* i is where the "hole" is */
2850  for (;;)
2851  {
2852  unsigned int j = 2 * i + 1;
2853 
2854  if (j >= n)
2855  break;
2856  if (j + 1 < n &&
2857  COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2858  j++;
2859  if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2860  break;
2861  memtuples[i] = memtuples[j];
2862  i = j;
2863  }
2864  memtuples[i] = *tuple;
2865 }
2866 
2867 /*
2868  * Function to reverse the sort direction from its current state
2869  *
2870  * It is not safe to call this when performing hash tuplesorts
2871  */
2872 static void
2874 {
2875  SortSupport sortKey = state->base.sortKeys;
2876  int nkey;
2877 
2878  for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2879  {
2880  sortKey->ssup_reverse = !sortKey->ssup_reverse;
2881  sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2882  }
2883 }
2884 
2885 
2886 /*
2887  * Tape interface routines
2888  */
2889 
2890 static unsigned int
2891 getlen(LogicalTape *tape, bool eofOK)
2892 {
2893  unsigned int len;
2894 
2895  if (LogicalTapeRead(tape,
2896  &len, sizeof(len)) != sizeof(len))
2897  elog(ERROR, "unexpected end of tape");
2898  if (len == 0 && !eofOK)
2899  elog(ERROR, "unexpected end of data");
2900  return len;
2901 }
2902 
2903 static void
2905 {
2906  unsigned int len = 0;
2907 
2908  LogicalTapeWrite(tape, &len, sizeof(len));
2909 }
2910 
2911 /*
2912  * Get memory for tuple from within READTUP() routine.
2913  *
2914  * We use next free slot from the slab allocator, or palloc() if the tuple
2915  * is too large for that.
2916  */
2917 void *
2919 {
2920  SlabSlot *buf;
2921 
2922  /*
2923  * We pre-allocate enough slots in the slab arena that we should never run
2924  * out.
2925  */
2926  Assert(state->slabFreeHead);
2927 
2928  if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2929  return MemoryContextAlloc(state->base.sortcontext, tuplen);
2930  else
2931  {
2932  buf = state->slabFreeHead;
2933  /* Reuse this slot */
2934  state->slabFreeHead = buf->nextfree;
2935 
2936  return buf;
2937  }
2938 }
2939 
2940 
2941 /*
2942  * Parallel sort routines
2943  */
2944 
2945 /*
2946  * tuplesort_estimate_shared - estimate required shared memory allocation
2947  *
2948  * nWorkers is an estimate of the number of workers (it's the number that
2949  * will be requested).
2950  */
2951 Size
2953 {
2954  Size tapesSize;
2955 
2956  Assert(nWorkers > 0);
2957 
2958  /* Make sure that BufFile shared state is MAXALIGN'd */
2959  tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2960  tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2961 
2962  return tapesSize;
2963 }
2964 
2965 /*
2966  * tuplesort_initialize_shared - initialize shared tuplesort state
2967  *
2968  * Must be called from leader process before workers are launched, to
2969  * establish state needed up-front for worker tuplesortstates. nWorkers
2970  * should match the argument passed to tuplesort_estimate_shared().
2971  */
2972 void
2974 {
2975  int i;
2976 
2977  Assert(nWorkers > 0);
2978 
2979  SpinLockInit(&shared->mutex);
2980  shared->currentWorker = 0;
2981  shared->workersFinished = 0;
2982  SharedFileSetInit(&shared->fileset, seg);
2983  shared->nTapes = nWorkers;
2984  for (i = 0; i < nWorkers; i++)
2985  {
2986  shared->tapes[i].firstblocknumber = 0L;
2987  }
2988 }
2989 
2990 /*
2991  * tuplesort_attach_shared - attach to shared tuplesort state
2992  *
2993  * Must be called by all worker processes.
2994  */
2995 void
2997 {
2998  /* Attach to SharedFileSet */
2999  SharedFileSetAttach(&shared->fileset, seg);
3000 }
3001 
3002 /*
3003  * worker_get_identifier - Assign and return ordinal identifier for worker
3004  *
3005  * The order in which these are assigned is not well defined, and should not
3006  * matter; worker numbers across parallel sort participants need only be
3007  * distinct and gapless. logtape.c requires this.
3008  *
3009  * Note that the identifiers assigned from here have no relation to
3010  * ParallelWorkerNumber number, to avoid making any assumption about
3011  * caller's requirements. However, we do follow the ParallelWorkerNumber
3012  * convention of representing a non-worker with worker number -1. This
3013  * includes the leader, as well as serial Tuplesort processes.
3014  */
3015 static int
3017 {
3018  Sharedsort *shared = state->shared;
3019  int worker;
3020 
3021  Assert(WORKER(state));
3022 
3023  SpinLockAcquire(&shared->mutex);
3024  worker = shared->currentWorker++;
3025  SpinLockRelease(&shared->mutex);
3026 
3027  return worker;
3028 }
3029 
3030 /*
3031  * worker_freeze_result_tape - freeze worker's result tape for leader
3032  *
3033  * This is called by workers just after the result tape has been determined,
3034  * instead of calling LogicalTapeFreeze() directly. They do so because
3035  * workers require a few additional steps over similar serial
3036  * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3037  * steps are around freeing now unneeded resources, and representing to
3038  * leader that worker's input run is available for its merge.
3039  *
3040  * There should only be one final output run for each worker, which consists
3041  * of all tuples that were originally input into worker.
3042  */
3043 static void
3045 {
3046  Sharedsort *shared = state->shared;
3047  TapeShare output;
3048 
3049  Assert(WORKER(state));
3050  Assert(state->result_tape != NULL);
3051  Assert(state->memtupcount == 0);
3052 
3053  /*
3054  * Free most remaining memory, in case caller is sensitive to our holding
3055  * on to it. memtuples may not be a tiny merge heap at this point.
3056  */
3057  pfree(state->memtuples);
3058  /* Be tidy */
3059  state->memtuples = NULL;
3060  state->memtupsize = 0;
3061 
3062  /*
3063  * Parallel worker requires result tape metadata, which is to be stored in
3064  * shared memory for leader
3065  */
3066  LogicalTapeFreeze(state->result_tape, &output);
3067 
3068  /* Store properties of output tape, and update finished worker count */
3069  SpinLockAcquire(&shared->mutex);
3070  shared->tapes[state->worker] = output;
3071  shared->workersFinished++;
3072  SpinLockRelease(&shared->mutex);
3073 }
3074 
3075 /*
3076  * worker_nomergeruns - dump memtuples in worker, without merging
3077  *
3078  * This called as an alternative to mergeruns() with a worker when no
3079  * merging is required.
3080  */
3081 static void
3083 {
3084  Assert(WORKER(state));
3085  Assert(state->result_tape == NULL);
3086  Assert(state->nOutputRuns == 1);
3087 
3088  state->result_tape = state->destTape;
3090 }
3091 
3092 /*
3093  * leader_takeover_tapes - create tapeset for leader from worker tapes
3094  *
3095  * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3096  * sorting has occurred in workers, all of which must have already returned
3097  * from tuplesort_performsort().
3098  *
3099  * When this returns, leader process is left in a state that is virtually
3100  * indistinguishable from it having generated runs as a serial external sort
3101  * might have.
3102  */
3103 static void
3105 {
3106  Sharedsort *shared = state->shared;
3107  int nParticipants = state->nParticipants;
3108  int workersFinished;
3109  int j;
3110 
3111  Assert(LEADER(state));
3112  Assert(nParticipants >= 1);
3113 
3114  SpinLockAcquire(&shared->mutex);
3115  workersFinished = shared->workersFinished;
3116  SpinLockRelease(&shared->mutex);
3117 
3118  if (nParticipants != workersFinished)
3119  elog(ERROR, "cannot take over tapes before all workers finish");
3120 
3121  /*
3122  * Create the tapeset from worker tapes, including a leader-owned tape at
3123  * the end. Parallel workers are far more expensive than logical tapes,
3124  * so the number of tapes allocated here should never be excessive.
3125  */
3126  inittapestate(state, nParticipants);
3127  state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3128 
3129  /*
3130  * Set currentRun to reflect the number of runs we will merge (it's not
3131  * used for anything, this is just pro forma)
3132  */
3133  state->currentRun = nParticipants;
3134 
3135  /*
3136  * Initialize the state to look the same as after building the initial
3137  * runs.
3138  *
3139  * There will always be exactly 1 run per worker, and exactly one input
3140  * tape per run, because workers always output exactly 1 run, even when
3141  * there were no input tuples for workers to sort.
3142  */
3143  state->inputTapes = NULL;
3144  state->nInputTapes = 0;
3145  state->nInputRuns = 0;
3146 
3147  state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3148  state->nOutputTapes = nParticipants;
3149  state->nOutputRuns = nParticipants;
3150 
3151  for (j = 0; j < nParticipants; j++)
3152  {
3153  state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3154  }
3155 
3156  state->status = TSS_BUILDRUNS;
3157 }
3158 
3159 /*
3160  * Convenience routine to free a tuple previously loaded into sort memory
3161  */
3162 static void
3164 {
3165  if (stup->tuple)
3166  {
3168  pfree(stup->tuple);
3169  stup->tuple = NULL;
3170  }
3171 }
3172 
3173 int
3175 {
3176  if (x < y)
3177  return -1;
3178  else if (x > y)
3179  return 1;
3180  else
3181  return 0;
3182 }
3183 
3184 #if SIZEOF_DATUM >= 8
3185 int
3186 ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3187 {
3188  int64 xx = DatumGetInt64(x);
3189  int64 yy = DatumGetInt64(y);
3190 
3191  if (xx < yy)
3192  return -1;
3193  else if (xx > yy)
3194  return 1;
3195  else
3196  return 0;
3197 }
3198 #endif
3199 
3200 int
3202 {
3203  int32 xx = DatumGetInt32(x);
3204  int32 yy = DatumGetInt32(y);
3205 
3206  if (xx < yy)
3207  return -1;
3208  else if (xx > yy)
3209  return 1;
3210  else
3211  return 0;
3212 }
void PrepareTempTablespaces(void)
Definition: tablespace.c:1337
#define Min(x, y)
Definition: c.h:993
#define MAXALIGN(LEN)
Definition: c.h:800
signed int int32
Definition: c.h:483
#define Max(x, y)
Definition: c.h:987
#define INT64_FORMAT
Definition: c.h:537
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:387
#define pg_attribute_always_inline
Definition: c.h:223
size_t Size
Definition: c.h:594
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:157
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
FILE * output
int y
Definition: isn.c:72
int b
Definition: isn.c:70
int x
Definition: isn.c:71
int a
Definition: isn.c:69
int j
Definition: isn.c:74
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1183
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
void pfree(void *pointer)
Definition: mcxt.c:1431
Size GetMemoryChunkSpace(void *pointer)
Definition: mcxt.c:630
void * palloc0(Size size)
Definition: mcxt.c:1232
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1034
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
void * palloc(Size size)
Definition: mcxt.c:1201
void MemoryContextResetOnly(MemoryContext context)
Definition: mcxt.c:349
void * repalloc_huge(void *pointer, Size size)
Definition: mcxt.c:1595
#define AllocSetContextCreate
Definition: memutils.h:128
#define MaxAllocHugeSize
Definition: memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const void size_t len
const char * pg_rusage_show(const PGRUsage *ru0)
Definition: pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition: pg_rusage.c:27
static char * buf
Definition: pg_test_fsync.c:73
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:385
uintptr_t Datum
Definition: postgres.h:64
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
int slock_t
Definition: s_lock.h:754
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:62
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:44
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:233
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:302
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedFileSet fileset
Definition: tuplesort.c:361
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: tuplesort.c:370
int workersFinished
Definition: tuplesort.c:358
int nTapes
Definition: tuplesort.c:364
slock_t mutex
Definition: tuplesort.c:347
int currentWorker
Definition: tuplesort.c:357
Sharedsort * sharedsort
Definition: tuplesort.h:58
bool ssup_nulls_first
Definition: sortsupport.h:75
void * tuple
Definition: tuplesort.h:139
int srctape
Definition: tuplesort.h:142
Datum datum1
Definition: tuplesort.h:140
int64 firstblocknumber
Definition: logtape.h:54
TuplesortMethod sortMethod
Definition: tuplesort.h:103
TuplesortSpaceType spaceType
Definition: tuplesort.h:104
void * lastReturnedTuple
Definition: tuplesort.c:263
LogicalTapeSet * tapeset
Definition: tuplesort.c:207
bool isMaxSpaceDisk
Definition: tuplesort.c:203
bool growmemtuples
Definition: tuplesort.c:219
SortTuple * memtuples
Definition: tuplesort.c:216
int64 maxSpace
Definition: tuplesort.c:201
LogicalTape ** inputTapes
Definition: tuplesort.c:279
bool slabAllocatorUsed
Definition: tuplesort.c:248
TuplesortPublic base
Definition: tuplesort.c:191
char * slabMemoryEnd
Definition: tuplesort.c:251
PGRUsage ru_start
Definition: tuplesort.c:336
char * slabMemoryBegin
Definition: tuplesort.c:250
LogicalTape ** outputTapes
Definition: tuplesort.c:283
bool eof_reached
Definition: tuplesort.c:296
size_t tape_buffer_mem
Definition: tuplesort.c:255
TupSortStatus status
Definition: tuplesort.c:192
int64 availMem
Definition: tuplesort.c:197
LogicalTape * destTape
Definition: tuplesort.c:287
TupSortStatus maxSpaceStatus
Definition: tuplesort.c:206
bool markpos_eof
Definition: tuplesort.c:301
int64 abbrevNext
Definition: tuplesort.c:329
int64 markpos_block
Definition: tuplesort.c:299
Sharedsort * shared
Definition: tuplesort.c:320
LogicalTape * result_tape
Definition: tuplesort.c:294
SlabSlot * slabFreeHead
Definition: tuplesort.c:252
int markpos_offset
Definition: tuplesort.c:300
int64 allowedMem
Definition: tuplesort.c:198
Definition: regguts.h:323
void tuplesort_rescan(Tuplesortstate *state)
Definition: tuplesort.c:2437
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1382
int tuplesort_merge_order(int64 allowedMem)
Definition: tuplesort.c:1801
#define TAPE_BUFFER_OVERHEAD
Definition: tuplesort.c:182
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition: tuplesort.c:2809
#define INITIAL_MEMTUPSIZE
Definition: tuplesort.c:122
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition: tuplesort.c:2891
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition: tuplesort.c:2973
#define COMPARETUP(state, a, b)
Definition: tuplesort.c:397
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition: tuplesort.c:643
static void selectnewtape(Tuplesortstate *state)
Definition: tuplesort.c:1973
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1037
#define SERIAL(state)
Definition: tuplesort.c:404
#define FREESTATE(state)
Definition: tuplesort.c:400
static void markrunend(LogicalTape *tape)
Definition: tuplesort.c:2904
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition: tuplesort.c:1733
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition: tuplesort.c:3163
#define REMOVEABBREV(state, stup, count)
Definition: tuplesort.c:396
#define LACKMEM(state)
Definition: tuplesort.c:401
static void reversedirection(Tuplesortstate *state)
Definition: tuplesort.c:2873
#define USEMEM(state, amt)
Definition: tuplesort.c:402
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2774
static bool grow_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:1070
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3174
static void beginmerge(Tuplesortstate *state)
Definition: tuplesort.c:2289
static void make_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2622
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:889
#define WRITETUP(state, tape, stup)
Definition: tuplesort.c:398
static void sort_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2671
TupSortStatus
Definition: tuplesort.c:159
@ TSS_SORTEDONTAPE
Definition: tuplesort.c:164
@ TSS_SORTEDINMEM
Definition: tuplesort.c:163
@ TSS_INITIAL
Definition: tuplesort.c:160
@ TSS_FINALMERGE
Definition: tuplesort.c:165
@ TSS_BUILDRUNS
Definition: tuplesort.c:162
@ TSS_BOUNDED
Definition: tuplesort.c:161
static int worker_get_identifier(Tuplesortstate *state)
Definition: tuplesort.c:3016
static void mergeonerun(Tuplesortstate *state)
Definition: tuplesort.c:2229
#define FREEMEM(state, amt)
Definition: tuplesort.c:403
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition: tuplesort.c:2601
#define MAXORDER
Definition: tuplesort.c:181
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition: tuplesort.c:1939
#define SLAB_SLOT_SIZE
Definition: tuplesort.c:146
static void leader_takeover_tapes(Tuplesortstate *state)
Definition: tuplesort.c:3104
Size tuplesort_estimate_shared(int nWorkers)
Definition: tuplesort.c:2952
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:2534
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:2711
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:969
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition: tuplesort.c:1888
void tuplesort_markpos(Tuplesortstate *state)
Definition: tuplesort.c:2470
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev)
Definition: tuplesort.c:1187
#define MERGE_BUFFER_SIZE
Definition: tuplesort.c:183
#define READTUP(state, stup, tape, len)
Definition: tuplesort.c:399
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3201
#define LEADER(state)
Definition: tuplesort.c:406
#define WORKER(state)
Definition: tuplesort.c:405
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition: tuplesort.c:1493
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition: tuplesort.c:1856
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition: tuplesort.c:2317
union SlabSlot SlabSlot
static void tuplesort_updatemax(Tuplesortstate *state)
Definition: tuplesort.c:986
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition: tuplesort.c:3044
bool trace_sort
Definition: tuplesort.c:127
#define RELEASE_SLAB_SLOT(state, tuple)
Definition: tuplesort.c:384
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition: tuplesort.c:2996
static void worker_nomergeruns(Tuplesortstate *state)
Definition: tuplesort.c:3082
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:496
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2833
void tuplesort_restorepos(Tuplesortstate *state)
Definition: tuplesort.c:2501
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:543
static void mergeruns(Tuplesortstate *state)
Definition: tuplesort.c:2042
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition: tuplesort.c:2918
#define MINORDER
Definition: tuplesort.c:180
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition: tuplesort.c:755
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:841
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition: tuplesort.c:2006
const char * tuplesort_method_name(TuplesortMethod m)
Definition: tuplesort.c:2578
static bool consider_abort_common(Tuplesortstate *state)
Definition: tuplesort.c:1338
static void tuplesort_free(Tuplesortstate *state)
Definition: tuplesort.c:900
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition: tuplesort.c:2336
#define TUPLESORT_RANDOMACCESS
Definition: tuplesort.h:96
#define TUPLESORT_ALLOWBOUNDED
Definition: tuplesort.h:99
TuplesortSpaceType
Definition: tuplesort.h:87
@ SORT_SPACE_TYPE_DISK
Definition: tuplesort.h:88
@ SORT_SPACE_TYPE_MEMORY
Definition: tuplesort.h:89
TuplesortMethod
Definition: tuplesort.h:76
@ SORT_TYPE_EXTERNAL_SORT
Definition: tuplesort.h:80
@ SORT_TYPE_TOP_N_HEAPSORT
Definition: tuplesort.h:78
@ SORT_TYPE_QUICKSORT
Definition: tuplesort.h:79
@ SORT_TYPE_STILL_IN_PROGRESS
Definition: tuplesort.h:77
@ SORT_TYPE_EXTERNAL_MERGE
Definition: tuplesort.h:81
char buffer[SLAB_SLOT_SIZE]
Definition: tuplesort.c:151
union SlabSlot * nextfree
Definition: tuplesort.c:150