PostgreSQL Source Code  git master
tuplesort.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  * Generalized tuple sorting routines.
5  *
6  * This module provides a generalized facility for tuple sorting, which can be
7  * applied to different kinds of sortable objects. Implementation of
8  * the particular sorting variants is given in tuplesortvariants.c.
9  * This module works efficiently for both small and large amounts
10  * of data. Small amounts are sorted in-memory using qsort(). Large
11  * amounts are sorted using temporary files and a standard external sort
12  * algorithm.
13  *
14  * See Knuth, volume 3, for more than you want to know about external
15  * sorting algorithms. The algorithm we use is a balanced k-way merge.
16  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18  * merge is better. Knuth is assuming that tape drives are expensive
19  * beasts, and in particular that there will always be many more runs than
20  * tape drives. The polyphase merge algorithm was good at keeping all the
21  * tape drives busy, but in our implementation a "tape drive" doesn't cost
22  * much more than a few Kb of memory buffers, so we can afford to have
23  * lots of them. In particular, if we can have as many tape drives as
24  * sorted runs, we can eliminate any repeated I/O at all.
25  *
26  * Historically, we divided the input into sorted runs using replacement
27  * selection, in the form of a priority tree implemented as a heap
28  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29  * for run generation.
30  *
31  * The approximate amount of memory allowed for any one sort operation
32  * is specified in kilobytes by the caller (most pass work_mem). Initially,
33  * we absorb tuples and simply store them in an unsorted array as long as
34  * we haven't exceeded workMem. If we reach the end of the input without
35  * exceeding workMem, we sort the array using qsort() and subsequently return
36  * tuples just by scanning the tuple array sequentially. If we do exceed
37  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38  * When tuples are dumped in batch after quicksorting, we begin a new run
39  * with a new output tape. If we reach the max number of tapes, we write
40  * subsequent runs on the existing tapes in a round-robin fashion. We will
41  * need multiple merge passes to finish the merge in that case. After the
42  * end of the input is reached, we dump out remaining tuples in memory into
43  * a final run, then merge the runs.
44  *
45  * When merging runs, we use a heap containing just the frontmost tuple from
46  * each source run; we repeatedly output the smallest tuple and replace it
47  * with the next tuple from its source tape (if any). When the heap empties,
48  * the merge is complete. The basic merge algorithm thus needs very little
49  * memory --- only M tuples for an M-way merge, and M is constrained to a
50  * small number. However, we can still make good use of our full workMem
51  * allocation by pre-reading additional blocks from each source tape. Without
52  * prereading, our access pattern to the temporary file would be very erratic;
53  * on average we'd read one block from each of M source tapes during the same
54  * time that we're writing M blocks to the output tape, so there is no
55  * sequentiality of access at all, defeating the read-ahead methods used by
56  * most Unix kernels. Worse, the output tape gets written into a very random
57  * sequence of blocks of the temp file, ensuring that things will be even
58  * worse when it comes time to read that tape. A straightforward merge pass
59  * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60  * by prereading from each source tape sequentially, loading about workMem/M
61  * bytes from each tape in turn, and making the sequential blocks immediately
62  * available for reuse. This approach helps to localize both read and write
63  * accesses. The pre-reading is handled by logtape.c, we just tell it how
64  * much memory to use for the buffers.
65  *
66  * In the current code we determine the number of input tapes M on the basis
67  * of workMem: we want workMem/M to be large enough that we read a fair
68  * amount of data each time we read from a tape, so as to maintain the
69  * locality of access described above. Nonetheless, with large workMem we
70  * can have many tapes. The logical "tapes" are implemented by logtape.c,
71  * which avoids space wastage by recycling disk space as soon as each block
72  * is read from its "tape".
73  *
74  * When the caller requests random access to the sort result, we form
75  * the final sorted run on a logical tape which is then "frozen", so
76  * that we can access it randomly. When the caller does not need random
77  * access, we return from tuplesort_performsort() as soon as we are down
78  * to one run per logical tape. The final merge is then performed
79  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80  * saves one cycle of writing all the data out to disk and reading it in.
81  *
82  * This module supports parallel sorting. Parallel sorts involve coordination
83  * among one or more worker processes, and a leader process, each with its own
84  * tuplesort state. The leader process (or, more accurately, the
85  * Tuplesortstate associated with a leader process) creates a full tapeset
86  * consisting of worker tapes with one run to merge; a run for every
87  * worker process. This is then merged. Worker processes are guaranteed to
88  * produce exactly one output run from their partial input.
89  *
90  *
91  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
92  * Portions Copyright (c) 1994, Regents of the University of California
93  *
94  * IDENTIFICATION
95  * src/backend/utils/sort/tuplesort.c
96  *
97  *-------------------------------------------------------------------------
98  */
99 
100 #include "postgres.h"
101 
102 #include <limits.h>
103 
104 #include "commands/tablespace.h"
105 #include "miscadmin.h"
106 #include "pg_trace.h"
107 #include "storage/shmem.h"
108 #include "utils/memutils.h"
109 #include "utils/pg_rusage.h"
110 #include "utils/tuplesort.h"
111 
112 /*
113  * Initial size of memtuples array. We're trying to select this size so that
114  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
115  * allocation might possibly be lowered. However, we don't consider array sizes
116  * less than 1024.
117  *
118  */
119 #define INITIAL_MEMTUPSIZE Max(1024, \
120  ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
121 
122 /* GUC variables */
123 #ifdef TRACE_SORT
124 bool trace_sort = false;
125 #endif
126 
127 #ifdef DEBUG_BOUNDED_SORT
128 bool optimize_bounded_sort = true;
129 #endif
130 
131 
132 /*
133  * During merge, we use a pre-allocated set of fixed-size slots to hold
134  * tuples. To avoid palloc/pfree overhead.
135  *
136  * Merge doesn't require a lot of memory, so we can afford to waste some,
137  * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
138  * palloc() overhead is not significant anymore.
139  *
140  * 'nextfree' is valid when this chunk is in the free list. When in use, the
141  * slot holds a tuple.
142  */
143 #define SLAB_SLOT_SIZE 1024
144 
145 typedef union SlabSlot
146 {
150 
151 /*
152  * Possible states of a Tuplesort object. These denote the states that
153  * persist between calls of Tuplesort routines.
154  */
155 typedef enum
156 {
157  TSS_INITIAL, /* Loading tuples; still within memory limit */
158  TSS_BOUNDED, /* Loading tuples into bounded-size heap */
159  TSS_BUILDRUNS, /* Loading tuples; writing to tape */
160  TSS_SORTEDINMEM, /* Sort completed entirely in memory */
161  TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
162  TSS_FINALMERGE, /* Performing final merge on-the-fly */
163 } TupSortStatus;
164 
165 /*
166  * Parameters for calculation of number of tapes to use --- see inittapes()
167  * and tuplesort_merge_order().
168  *
169  * In this calculation we assume that each tape will cost us about 1 blocks
170  * worth of buffer space. This ignores the overhead of all the other data
171  * structures needed for each tape, but it's probably close enough.
172  *
173  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
174  * input tape, for pre-reading (see discussion at top of file). This is *in
175  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
176  */
177 #define MINORDER 6 /* minimum merge order */
178 #define MAXORDER 500 /* maximum merge order */
179 #define TAPE_BUFFER_OVERHEAD BLCKSZ
180 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
181 
182 
183 /*
184  * Private state of a Tuplesort operation.
185  */
187 {
189  TupSortStatus status; /* enumerated value as shown above */
190  bool bounded; /* did caller specify a maximum number of
191  * tuples to return? */
192  bool boundUsed; /* true if we made use of a bounded heap */
193  int bound; /* if bounded, the maximum number of tuples */
194  int64 tupleMem; /* memory consumed by individual tuples.
195  * storing this separately from what we track
196  * in availMem allows us to subtract the
197  * memory consumed by all tuples when dumping
198  * tuples to tape */
199  int64 availMem; /* remaining memory available, in bytes */
200  int64 allowedMem; /* total memory allowed, in bytes */
201  int maxTapes; /* max number of input tapes to merge in each
202  * pass */
203  int64 maxSpace; /* maximum amount of space occupied among sort
204  * of groups, either in-memory or on-disk */
205  bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
206  * space, false when it's value for in-memory
207  * space */
208  TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
209  LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
210 
211  /*
212  * This array holds the tuples now in sort memory. If we are in state
213  * INITIAL, the tuples are in no particular order; if we are in state
214  * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
215  * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
216  * H. In state SORTEDONTAPE, the array is not used.
217  */
218  SortTuple *memtuples; /* array of SortTuple structs */
219  int memtupcount; /* number of tuples currently present */
220  int memtupsize; /* allocated length of memtuples array */
221  bool growmemtuples; /* memtuples' growth still underway? */
222 
223  /*
224  * Memory for tuples is sometimes allocated using a simple slab allocator,
225  * rather than with palloc(). Currently, we switch to slab allocation
226  * when we start merging. Merging only needs to keep a small, fixed
227  * number of tuples in memory at any time, so we can avoid the
228  * palloc/pfree overhead by recycling a fixed number of fixed-size slots
229  * to hold the tuples.
230  *
231  * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
232  * slots. The allocation is sized to have one slot per tape, plus one
233  * additional slot. We need that many slots to hold all the tuples kept
234  * in the heap during merge, plus the one we have last returned from the
235  * sort, with tuplesort_gettuple.
236  *
237  * Initially, all the slots are kept in a linked list of free slots. When
238  * a tuple is read from a tape, it is put to the next available slot, if
239  * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
240  * instead.
241  *
242  * When we're done processing a tuple, we return the slot back to the free
243  * list, or pfree() if it was palloc'd. We know that a tuple was
244  * allocated from the slab, if its pointer value is between
245  * slabMemoryBegin and -End.
246  *
247  * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
248  * tracking memory usage is not used.
249  */
251 
252  char *slabMemoryBegin; /* beginning of slab memory arena */
253  char *slabMemoryEnd; /* end of slab memory arena */
254  SlabSlot *slabFreeHead; /* head of free list */
255 
256  /* Memory used for input and output tape buffers. */
258 
259  /*
260  * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
261  * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
262  * modes), we remember the tuple in 'lastReturnedTuple', so that we can
263  * recycle the memory on next gettuple call.
264  */
266 
267  /*
268  * While building initial runs, this is the current output run number.
269  * Afterwards, it is the number of initial runs we made.
270  */
272 
273  /*
274  * Logical tapes, for merging.
275  *
276  * The initial runs are written in the output tapes. In each merge pass,
277  * the output tapes of the previous pass become the input tapes, and new
278  * output tapes are created as needed. When nInputTapes equals
279  * nInputRuns, there is only one merge pass left.
280  */
284 
288 
289  LogicalTape *destTape; /* current output tape */
290 
291  /*
292  * These variables are used after completion of sorting to keep track of
293  * the next tuple to return. (In the tape case, the tape's current read
294  * position is also critical state.)
295  */
296  LogicalTape *result_tape; /* actual tape of finished output */
297  int current; /* array index (only used if SORTEDINMEM) */
298  bool eof_reached; /* reached EOF (needed for cursors) */
299 
300  /* markpos_xxx holds marked position for mark and restore */
301  int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
302  int markpos_offset; /* saved "current", or offset in tape block */
303  bool markpos_eof; /* saved "eof_reached" */
304 
305  /*
306  * These variables are used during parallel sorting.
307  *
308  * worker is our worker identifier. Follows the general convention that
309  * -1 value relates to a leader tuplesort, and values >= 0 worker
310  * tuplesorts. (-1 can also be a serial tuplesort.)
311  *
312  * shared is mutable shared memory state, which is used to coordinate
313  * parallel sorts.
314  *
315  * nParticipants is the number of worker Tuplesortstates known by the
316  * leader to have actually been launched, which implies that they must
317  * finish a run that the leader needs to merge. Typically includes a
318  * worker state held by the leader process itself. Set in the leader
319  * Tuplesortstate only.
320  */
321  int worker;
324 
325  /*
326  * Additional state for managing "abbreviated key" sortsupport routines
327  * (which currently may be used by all cases except the hash index case).
328  * Tracks the intervals at which the optimization's effectiveness is
329  * tested.
330  */
331  int64 abbrevNext; /* Tuple # at which to next check
332  * applicability */
333 
334  /*
335  * Resource snapshot for time of sort start.
336  */
337 #ifdef TRACE_SORT
339 #endif
340 };
341 
342 /*
343  * Private mutable state of tuplesort-parallel-operation. This is allocated
344  * in shared memory.
345  */
347 {
348  /* mutex protects all fields prior to tapes */
350 
351  /*
352  * currentWorker generates ordinal identifier numbers for parallel sort
353  * workers. These start from 0, and are always gapless.
354  *
355  * Workers increment workersFinished to indicate having finished. If this
356  * is equal to state.nParticipants within the leader, leader is ready to
357  * merge worker runs.
358  */
361 
362  /* Temporary file space */
364 
365  /* Size of tapes flexible array */
366  int nTapes;
367 
368  /*
369  * Tapes array used by workers to report back information needed by the
370  * leader to concatenate all worker tapes into one for merging
371  */
373 };
374 
375 /*
376  * Is the given tuple allocated from the slab memory arena?
377  */
378 #define IS_SLAB_SLOT(state, tuple) \
379  ((char *) (tuple) >= (state)->slabMemoryBegin && \
380  (char *) (tuple) < (state)->slabMemoryEnd)
381 
382 /*
383  * Return the given tuple to the slab memory free list, or free it
384  * if it was palloc'd.
385  */
386 #define RELEASE_SLAB_SLOT(state, tuple) \
387  do { \
388  SlabSlot *buf = (SlabSlot *) tuple; \
389  \
390  if (IS_SLAB_SLOT((state), buf)) \
391  { \
392  buf->nextfree = (state)->slabFreeHead; \
393  (state)->slabFreeHead = buf; \
394  } else \
395  pfree(buf); \
396  } while(0)
397 
398 #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
399 #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
400 #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
401 #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
402 #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
403 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
404 #define USEMEM(state,amt) ((state)->availMem -= (amt))
405 #define FREEMEM(state,amt) ((state)->availMem += (amt))
406 #define SERIAL(state) ((state)->shared == NULL)
407 #define WORKER(state) ((state)->shared && (state)->worker != -1)
408 #define LEADER(state) ((state)->shared && (state)->worker == -1)
409 
410 /*
411  * NOTES about on-tape representation of tuples:
412  *
413  * We require the first "unsigned int" of a stored tuple to be the total size
414  * on-tape of the tuple, including itself (so it is never zero; an all-zero
415  * unsigned int is used to delimit runs). The remainder of the stored tuple
416  * may or may not match the in-memory representation of the tuple ---
417  * any conversion needed is the job of the writetup and readtup routines.
418  *
419  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
420  * representation of the tuple must be followed by another "unsigned int" that
421  * is a copy of the length --- so the total tape space used is actually
422  * sizeof(unsigned int) more than the stored length value. This allows
423  * read-backwards. When the random access flag was not specified, the
424  * write/read routines may omit the extra length word.
425  *
426  * writetup is expected to write both length words as well as the tuple
427  * data. When readtup is called, the tape is positioned just after the
428  * front length word; readtup must read the tuple data and advance past
429  * the back length word (if present).
430  *
431  * The write/read routines can make use of the tuple description data
432  * stored in the Tuplesortstate record, if needed. They are also expected
433  * to adjust state->availMem by the amount of memory space (not tape space!)
434  * released or consumed. There is no error return from either writetup
435  * or readtup; they should ereport() on failure.
436  *
437  *
438  * NOTES about memory consumption calculations:
439  *
440  * We count space allocated for tuples against the workMem limit, plus
441  * the space used by the variable-size memtuples array. Fixed-size space
442  * is not counted; it's small enough to not be interesting.
443  *
444  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
445  * rather than the originally-requested size. This is important since
446  * palloc can add substantial overhead. It's not a complete answer since
447  * we won't count any wasted space in palloc allocation blocks, but it's
448  * a lot better than what we were doing before 7.3. As of 9.6, a
449  * separate memory context is used for caller passed tuples. Resetting
450  * it at certain key increments significantly ameliorates fragmentation.
451  * readtup routines use the slab allocator (they cannot use
452  * the reset context because it gets deleted at the point that merging
453  * begins).
454  */
455 
456 
459 static void inittapes(Tuplesortstate *state, bool mergeruns);
460 static void inittapestate(Tuplesortstate *state, int maxTapes);
461 static void selectnewtape(Tuplesortstate *state);
462 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
463 static void mergeruns(Tuplesortstate *state);
464 static void mergeonerun(Tuplesortstate *state);
465 static void beginmerge(Tuplesortstate *state);
466 static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
467 static void dumptuples(Tuplesortstate *state, bool alltuples);
475 static unsigned int getlen(LogicalTape *tape, bool eofOK);
476 static void markrunend(LogicalTape *tape);
481 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
482 static void tuplesort_free(Tuplesortstate *state);
484 
485 /*
486  * Specialized comparators that we can inline into specialized sorts. The goal
487  * is to try to sort two tuples without having to follow the pointers to the
488  * comparator or the tuple.
489  *
490  * XXX: For now, there is no specialization for cases where datum1 is
491  * authoritative and we don't even need to fall back to a callback at all (that
492  * would be true for types like int4/int8/timestamp/date, but not true for
493  * abbreviations of text or multi-key sorts. There could be! Is it worth it?
494  */
495 
496 /* Used if first key's comparator is ssup_datum_unsigned_cmp */
499 {
500  int compare;
501 
502  compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
503  b->datum1, b->isnull1,
504  &state->base.sortKeys[0]);
505  if (compare != 0)
506  return compare;
507 
508  /*
509  * No need to waste effort calling the tiebreak function when there are no
510  * other keys to sort on.
511  */
512  if (state->base.onlyKey != NULL)
513  return 0;
514 
515  return state->base.comparetup_tiebreak(a, b, state);
516 }
517 
518 #if SIZEOF_DATUM >= 8
519 /* Used if first key's comparator is ssup_datum_signed_cmp */
521 qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
522 {
523  int compare;
524 
525  compare = ApplySignedSortComparator(a->datum1, a->isnull1,
526  b->datum1, b->isnull1,
527  &state->base.sortKeys[0]);
528 
529  if (compare != 0)
530  return compare;
531 
532  /*
533  * No need to waste effort calling the tiebreak function when there are no
534  * other keys to sort on.
535  */
536  if (state->base.onlyKey != NULL)
537  return 0;
538 
539  return state->base.comparetup_tiebreak(a, b, state);
540 }
541 #endif
542 
543 /* Used if first key's comparator is ssup_datum_int32_cmp */
546 {
547  int compare;
548 
549  compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
550  b->datum1, b->isnull1,
551  &state->base.sortKeys[0]);
552 
553  if (compare != 0)
554  return compare;
555 
556  /*
557  * No need to waste effort calling the tiebreak function when there are no
558  * other keys to sort on.
559  */
560  if (state->base.onlyKey != NULL)
561  return 0;
562 
563  return state->base.comparetup_tiebreak(a, b, state);
564 }
565 
566 /*
567  * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
568  * any variant of SortTuples, using the appropriate comparetup function.
569  * qsort_ssup() is specialized for the case where the comparetup function
570  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
571  * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
572  * common comparison functions on pass-by-value leading datums.
573  */
574 
575 #define ST_SORT qsort_tuple_unsigned
576 #define ST_ELEMENT_TYPE SortTuple
577 #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
578 #define ST_COMPARE_ARG_TYPE Tuplesortstate
579 #define ST_CHECK_FOR_INTERRUPTS
580 #define ST_SCOPE static
581 #define ST_DEFINE
582 #include "lib/sort_template.h"
583 
584 #if SIZEOF_DATUM >= 8
585 #define ST_SORT qsort_tuple_signed
586 #define ST_ELEMENT_TYPE SortTuple
587 #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
588 #define ST_COMPARE_ARG_TYPE Tuplesortstate
589 #define ST_CHECK_FOR_INTERRUPTS
590 #define ST_SCOPE static
591 #define ST_DEFINE
592 #include "lib/sort_template.h"
593 #endif
594 
595 #define ST_SORT qsort_tuple_int32
596 #define ST_ELEMENT_TYPE SortTuple
597 #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
598 #define ST_COMPARE_ARG_TYPE Tuplesortstate
599 #define ST_CHECK_FOR_INTERRUPTS
600 #define ST_SCOPE static
601 #define ST_DEFINE
602 #include "lib/sort_template.h"
603 
604 #define ST_SORT qsort_tuple
605 #define ST_ELEMENT_TYPE SortTuple
606 #define ST_COMPARE_RUNTIME_POINTER
607 #define ST_COMPARE_ARG_TYPE Tuplesortstate
608 #define ST_CHECK_FOR_INTERRUPTS
609 #define ST_SCOPE static
610 #define ST_DECLARE
611 #define ST_DEFINE
612 #include "lib/sort_template.h"
613 
614 #define ST_SORT qsort_ssup
615 #define ST_ELEMENT_TYPE SortTuple
616 #define ST_COMPARE(a, b, ssup) \
617  ApplySortComparator((a)->datum1, (a)->isnull1, \
618  (b)->datum1, (b)->isnull1, (ssup))
619 #define ST_COMPARE_ARG_TYPE SortSupportData
620 #define ST_CHECK_FOR_INTERRUPTS
621 #define ST_SCOPE static
622 #define ST_DEFINE
623 #include "lib/sort_template.h"
624 
625 /*
626  * tuplesort_begin_xxx
627  *
628  * Initialize for a tuple sort operation.
629  *
630  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
631  * zero or more times, then call tuplesort_performsort when all the tuples
632  * have been supplied. After performsort, retrieve the tuples in sorted
633  * order by calling tuplesort_getXXX until it returns false/NULL. (If random
634  * access was requested, rescan, markpos, and restorepos can also be called.)
635  * Call tuplesort_end to terminate the operation and release memory/disk space.
636  *
637  * Each variant of tuplesort_begin has a workMem parameter specifying the
638  * maximum number of kilobytes of RAM to use before spilling data to disk.
639  * (The normal value of this parameter is work_mem, but some callers use
640  * other values.) Each variant also has a sortopt which is a bitmask of
641  * sort options. See TUPLESORT_* definitions in tuplesort.h
642  */
643 
645 tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
646 {
648  MemoryContext maincontext;
649  MemoryContext sortcontext;
650  MemoryContext oldcontext;
651 
652  /* See leader_takeover_tapes() remarks on random access support */
653  if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
654  elog(ERROR, "random access disallowed under parallel sort");
655 
656  /*
657  * Memory context surviving tuplesort_reset. This memory context holds
658  * data which is useful to keep while sorting multiple similar batches.
659  */
661  "TupleSort main",
663 
664  /*
665  * Create a working memory context for one sort operation. The content of
666  * this context is deleted by tuplesort_reset.
667  */
668  sortcontext = AllocSetContextCreate(maincontext,
669  "TupleSort sort",
671 
672  /*
673  * Additionally a working memory context for tuples is setup in
674  * tuplesort_begin_batch.
675  */
676 
677  /*
678  * Make the Tuplesortstate within the per-sortstate context. This way, we
679  * don't need a separate pfree() operation for it at shutdown.
680  */
681  oldcontext = MemoryContextSwitchTo(maincontext);
682 
684 
685 #ifdef TRACE_SORT
686  if (trace_sort)
687  pg_rusage_init(&state->ru_start);
688 #endif
689 
690  state->base.sortopt = sortopt;
691  state->base.tuples = true;
692  state->abbrevNext = 10;
693 
694  /*
695  * workMem is forced to be at least 64KB, the current minimum valid value
696  * for the work_mem GUC. This is a defense against parallel sort callers
697  * that divide out memory among many workers in a way that leaves each
698  * with very little memory.
699  */
700  state->allowedMem = Max(workMem, 64) * (int64) 1024;
701  state->base.sortcontext = sortcontext;
702  state->base.maincontext = maincontext;
703 
704  /*
705  * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
706  * see comments in grow_memtuples().
707  */
708  state->memtupsize = INITIAL_MEMTUPSIZE;
709  state->memtuples = NULL;
710 
711  /*
712  * After all of the other non-parallel-related state, we setup all of the
713  * state needed for each batch.
714  */
716 
717  /*
718  * Initialize parallel-related state based on coordination information
719  * from caller
720  */
721  if (!coordinate)
722  {
723  /* Serial sort */
724  state->shared = NULL;
725  state->worker = -1;
726  state->nParticipants = -1;
727  }
728  else if (coordinate->isWorker)
729  {
730  /* Parallel worker produces exactly one final run from all input */
731  state->shared = coordinate->sharedsort;
732  state->worker = worker_get_identifier(state);
733  state->nParticipants = -1;
734  }
735  else
736  {
737  /* Parallel leader state only used for final merge */
738  state->shared = coordinate->sharedsort;
739  state->worker = -1;
740  state->nParticipants = coordinate->nParticipants;
741  Assert(state->nParticipants >= 1);
742  }
743 
744  MemoryContextSwitchTo(oldcontext);
745 
746  return state;
747 }
748 
749 /*
750  * tuplesort_begin_batch
751  *
752  * Setup, or reset, all state need for processing a new set of tuples with this
753  * sort state. Called both from tuplesort_begin_common (the first time sorting
754  * with this sort state) and tuplesort_reset (for subsequent usages).
755  */
756 static void
758 {
759  MemoryContext oldcontext;
760 
761  oldcontext = MemoryContextSwitchTo(state->base.maincontext);
762 
763  /*
764  * Caller tuple (e.g. IndexTuple) memory context.
765  *
766  * A dedicated child context used exclusively for caller passed tuples
767  * eases memory management. Resetting at key points reduces
768  * fragmentation. Note that the memtuples array of SortTuples is allocated
769  * in the parent context, not this context, because there is no need to
770  * free memtuples early. For bounded sorts, tuples may be pfreed in any
771  * order, so we use a regular aset.c context so that it can make use of
772  * free'd memory. When the sort is not bounded, we make use of a bump.c
773  * context as this keeps allocations more compact with less wastage.
774  * Allocations are also slightly more CPU efficient.
775  */
776  if (TupleSortUseBumpTupleCxt(state->base.sortopt))
777  state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
778  "Caller tuples",
780  else
781  state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
782  "Caller tuples",
784 
785 
786  state->status = TSS_INITIAL;
787  state->bounded = false;
788  state->boundUsed = false;
789 
790  state->availMem = state->allowedMem;
791 
792  state->tapeset = NULL;
793 
794  state->memtupcount = 0;
795 
796  /*
797  * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
798  * see comments in grow_memtuples().
799  */
800  state->growmemtuples = true;
801  state->slabAllocatorUsed = false;
802  if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
803  {
804  pfree(state->memtuples);
805  state->memtuples = NULL;
806  state->memtupsize = INITIAL_MEMTUPSIZE;
807  }
808  if (state->memtuples == NULL)
809  {
810  state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
811  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
812  }
813 
814  /* workMem must be large enough for the minimal memtuples array */
815  if (LACKMEM(state))
816  elog(ERROR, "insufficient memory allowed for sort");
817 
818  state->currentRun = 0;
819 
820  /*
821  * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
822  * inittapes(), if needed.
823  */
824 
825  state->result_tape = NULL; /* flag that result tape has not been formed */
826 
827  MemoryContextSwitchTo(oldcontext);
828 }
829 
830 /*
831  * tuplesort_set_bound
832  *
833  * Advise tuplesort that at most the first N result tuples are required.
834  *
835  * Must be called before inserting any tuples. (Actually, we could allow it
836  * as long as the sort hasn't spilled to disk, but there seems no need for
837  * delayed calls at the moment.)
838  *
839  * This is a hint only. The tuplesort may still return more tuples than
840  * requested. Parallel leader tuplesorts will always ignore the hint.
841  */
842 void
844 {
845  /* Assert we're called before loading any tuples */
846  Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
847  /* Assert we allow bounded sorts */
848  Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
849  /* Can't set the bound twice, either */
850  Assert(!state->bounded);
851  /* Also, this shouldn't be called in a parallel worker */
852  Assert(!WORKER(state));
853 
854  /* Parallel leader allows but ignores hint */
855  if (LEADER(state))
856  return;
857 
858 #ifdef DEBUG_BOUNDED_SORT
859  /* Honor GUC setting that disables the feature (for easy testing) */
860  if (!optimize_bounded_sort)
861  return;
862 #endif
863 
864  /* We want to be able to compute bound * 2, so limit the setting */
865  if (bound > (int64) (INT_MAX / 2))
866  return;
867 
868  state->bounded = true;
869  state->bound = (int) bound;
870 
871  /*
872  * Bounded sorts are not an effective target for abbreviated key
873  * optimization. Disable by setting state to be consistent with no
874  * abbreviation support.
875  */
876  state->base.sortKeys->abbrev_converter = NULL;
877  if (state->base.sortKeys->abbrev_full_comparator)
878  state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
879 
880  /* Not strictly necessary, but be tidy */
881  state->base.sortKeys->abbrev_abort = NULL;
882  state->base.sortKeys->abbrev_full_comparator = NULL;
883 }
884 
885 /*
886  * tuplesort_used_bound
887  *
888  * Allow callers to find out if the sort state was able to use a bound.
889  */
890 bool
892 {
893  return state->boundUsed;
894 }
895 
896 /*
897  * tuplesort_free
898  *
899  * Internal routine for freeing resources of tuplesort.
900  */
901 static void
903 {
904  /* context swap probably not needed, but let's be safe */
905  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
906 
907 #ifdef TRACE_SORT
908  int64 spaceUsed;
909 
910  if (state->tapeset)
911  spaceUsed = LogicalTapeSetBlocks(state->tapeset);
912  else
913  spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
914 #endif
915 
916  /*
917  * Delete temporary "tape" files, if any.
918  *
919  * Note: want to include this in reported total cost of sort, hence need
920  * for two #ifdef TRACE_SORT sections.
921  *
922  * We don't bother to destroy the individual tapes here. They will go away
923  * with the sortcontext. (In TSS_FINALMERGE state, we have closed
924  * finished tapes already.)
925  */
926  if (state->tapeset)
927  LogicalTapeSetClose(state->tapeset);
928 
929 #ifdef TRACE_SORT
930  if (trace_sort)
931  {
932  if (state->tapeset)
933  elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
934  SERIAL(state) ? "external sort" : "parallel external sort",
935  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
936  else
937  elog(LOG, "%s of worker %d ended, %lld KB used: %s",
938  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
939  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
940  }
941 
942  TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
943 #else
944 
945  /*
946  * If you disabled TRACE_SORT, you can still probe sort__done, but you
947  * ain't getting space-used stats.
948  */
949  TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
950 #endif
951 
952  FREESTATE(state);
953  MemoryContextSwitchTo(oldcontext);
954 
955  /*
956  * Free the per-sort memory context, thereby releasing all working memory.
957  */
958  MemoryContextReset(state->base.sortcontext);
959 }
960 
961 /*
962  * tuplesort_end
963  *
964  * Release resources and clean up.
965  *
966  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
967  * pointing to garbage. Be careful not to attempt to use or free such
968  * pointers afterwards!
969  */
970 void
972 {
974 
975  /*
976  * Free the main memory context, including the Tuplesortstate struct
977  * itself.
978  */
979  MemoryContextDelete(state->base.maincontext);
980 }
981 
982 /*
983  * tuplesort_updatemax
984  *
985  * Update maximum resource usage statistics.
986  */
987 static void
989 {
990  int64 spaceUsed;
991  bool isSpaceDisk;
992 
993  /*
994  * Note: it might seem we should provide both memory and disk usage for a
995  * disk-based sort. However, the current code doesn't track memory space
996  * accurately once we have begun to return tuples to the caller (since we
997  * don't account for pfree's the caller is expected to do), so we cannot
998  * rely on availMem in a disk sort. This does not seem worth the overhead
999  * to fix. Is it worth creating an API for the memory context code to
1000  * tell us how much is actually used in sortcontext?
1001  */
1002  if (state->tapeset)
1003  {
1004  isSpaceDisk = true;
1005  spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1006  }
1007  else
1008  {
1009  isSpaceDisk = false;
1010  spaceUsed = state->allowedMem - state->availMem;
1011  }
1012 
1013  /*
1014  * Sort evicts data to the disk when it wasn't able to fit that data into
1015  * main memory. This is why we assume space used on the disk to be more
1016  * important for tracking resource usage than space used in memory. Note
1017  * that the amount of space occupied by some tupleset on the disk might be
1018  * less than amount of space occupied by the same tupleset in memory due
1019  * to more compact representation.
1020  */
1021  if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1022  (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1023  {
1024  state->maxSpace = spaceUsed;
1025  state->isMaxSpaceDisk = isSpaceDisk;
1026  state->maxSpaceStatus = state->status;
1027  }
1028 }
1029 
1030 /*
1031  * tuplesort_reset
1032  *
1033  * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1034  * meta-information in. After tuplesort_reset, tuplesort is ready to start
1035  * a new sort. This allows avoiding recreation of tuple sort states (and
1036  * save resources) when sorting multiple small batches.
1037  */
1038 void
1040 {
1043 
1044  /*
1045  * After we've freed up per-batch memory, re-setup all of the state common
1046  * to both the first batch and any subsequent batch.
1047  */
1049 
1050  state->lastReturnedTuple = NULL;
1051  state->slabMemoryBegin = NULL;
1052  state->slabMemoryEnd = NULL;
1053  state->slabFreeHead = NULL;
1054 }
1055 
1056 /*
1057  * Grow the memtuples[] array, if possible within our memory constraint. We
1058  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1059  * limit. Return true if we were able to enlarge the array, false if not.
1060  *
1061  * Normally, at each increment we double the size of the array. When doing
1062  * that would exceed a limit, we attempt one last, smaller increase (and then
1063  * clear the growmemtuples flag so we don't try any more). That allows us to
1064  * use memory as fully as permitted; sticking to the pure doubling rule could
1065  * result in almost half going unused. Because availMem moves around with
1066  * tuple addition/removal, we need some rule to prevent making repeated small
1067  * increases in memtupsize, which would just be useless thrashing. The
1068  * growmemtuples flag accomplishes that and also prevents useless
1069  * recalculations in this function.
1070  */
1071 static bool
1073 {
1074  int newmemtupsize;
1075  int memtupsize = state->memtupsize;
1076  int64 memNowUsed = state->allowedMem - state->availMem;
1077 
1078  /* Forget it if we've already maxed out memtuples, per comment above */
1079  if (!state->growmemtuples)
1080  return false;
1081 
1082  /* Select new value of memtupsize */
1083  if (memNowUsed <= state->availMem)
1084  {
1085  /*
1086  * We've used no more than half of allowedMem; double our usage,
1087  * clamping at INT_MAX tuples.
1088  */
1089  if (memtupsize < INT_MAX / 2)
1090  newmemtupsize = memtupsize * 2;
1091  else
1092  {
1093  newmemtupsize = INT_MAX;
1094  state->growmemtuples = false;
1095  }
1096  }
1097  else
1098  {
1099  /*
1100  * This will be the last increment of memtupsize. Abandon doubling
1101  * strategy and instead increase as much as we safely can.
1102  *
1103  * To stay within allowedMem, we can't increase memtupsize by more
1104  * than availMem / sizeof(SortTuple) elements. In practice, we want
1105  * to increase it by considerably less, because we need to leave some
1106  * space for the tuples to which the new array slots will refer. We
1107  * assume the new tuples will be about the same size as the tuples
1108  * we've already seen, and thus we can extrapolate from the space
1109  * consumption so far to estimate an appropriate new size for the
1110  * memtuples array. The optimal value might be higher or lower than
1111  * this estimate, but it's hard to know that in advance. We again
1112  * clamp at INT_MAX tuples.
1113  *
1114  * This calculation is safe against enlarging the array so much that
1115  * LACKMEM becomes true, because the memory currently used includes
1116  * the present array; thus, there would be enough allowedMem for the
1117  * new array elements even if no other memory were currently used.
1118  *
1119  * We do the arithmetic in float8, because otherwise the product of
1120  * memtupsize and allowedMem could overflow. Any inaccuracy in the
1121  * result should be insignificant; but even if we computed a
1122  * completely insane result, the checks below will prevent anything
1123  * really bad from happening.
1124  */
1125  double grow_ratio;
1126 
1127  grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1128  if (memtupsize * grow_ratio < INT_MAX)
1129  newmemtupsize = (int) (memtupsize * grow_ratio);
1130  else
1131  newmemtupsize = INT_MAX;
1132 
1133  /* We won't make any further enlargement attempts */
1134  state->growmemtuples = false;
1135  }
1136 
1137  /* Must enlarge array by at least one element, else report failure */
1138  if (newmemtupsize <= memtupsize)
1139  goto noalloc;
1140 
1141  /*
1142  * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1143  * to ensure our request won't be rejected. Note that we can easily
1144  * exhaust address space before facing this outcome. (This is presently
1145  * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1146  * don't rely on that at this distance.)
1147  */
1148  if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1149  {
1150  newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1151  state->growmemtuples = false; /* can't grow any more */
1152  }
1153 
1154  /*
1155  * We need to be sure that we do not cause LACKMEM to become true, else
1156  * the space management algorithm will go nuts. The code above should
1157  * never generate a dangerous request, but to be safe, check explicitly
1158  * that the array growth fits within availMem. (We could still cause
1159  * LACKMEM if the memory chunk overhead associated with the memtuples
1160  * array were to increase. That shouldn't happen because we chose the
1161  * initial array size large enough to ensure that palloc will be treating
1162  * both old and new arrays as separate chunks. But we'll check LACKMEM
1163  * explicitly below just in case.)
1164  */
1165  if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1166  goto noalloc;
1167 
1168  /* OK, do it */
1169  FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1170  state->memtupsize = newmemtupsize;
1171  state->memtuples = (SortTuple *)
1172  repalloc_huge(state->memtuples,
1173  state->memtupsize * sizeof(SortTuple));
1174  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1175  if (LACKMEM(state))
1176  elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1177  return true;
1178 
1179 noalloc:
1180  /* If for any reason we didn't realloc, shut off future attempts */
1181  state->growmemtuples = false;
1182  return false;
1183 }
1184 
1185 /*
1186  * Shared code for tuple and datum cases.
1187  */
1188 void
1190  bool useAbbrev, Size tuplen)
1191 {
1192  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1193 
1194  Assert(!LEADER(state));
1195 
1196  /* account for the memory used for this tuple */
1197  USEMEM(state, tuplen);
1198  state->tupleMem += tuplen;
1199 
1200  if (!useAbbrev)
1201  {
1202  /*
1203  * Leave ordinary Datum representation, or NULL value. If there is a
1204  * converter it won't expect NULL values, and cost model is not
1205  * required to account for NULL, so in that case we avoid calling
1206  * converter and just set datum1 to zeroed representation (to be
1207  * consistent, and to support cheap inequality tests for NULL
1208  * abbreviated keys).
1209  */
1210  }
1211  else if (!consider_abort_common(state))
1212  {
1213  /* Store abbreviated key representation */
1214  tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1215  state->base.sortKeys);
1216  }
1217  else
1218  {
1219  /*
1220  * Set state to be consistent with never trying abbreviation.
1221  *
1222  * Alter datum1 representation in already-copied tuples, so as to
1223  * ensure a consistent representation (current tuple was just
1224  * handled). It does not matter if some dumped tuples are already
1225  * sorted on tape, since serialized tuples lack abbreviated keys
1226  * (TSS_BUILDRUNS state prevents control reaching here in any case).
1227  */
1228  REMOVEABBREV(state, state->memtuples, state->memtupcount);
1229  }
1230 
1231  switch (state->status)
1232  {
1233  case TSS_INITIAL:
1234 
1235  /*
1236  * Save the tuple into the unsorted array. First, grow the array
1237  * as needed. Note that we try to grow the array when there is
1238  * still one free slot remaining --- if we fail, there'll still be
1239  * room to store the incoming tuple, and then we'll switch to
1240  * tape-based operation.
1241  */
1242  if (state->memtupcount >= state->memtupsize - 1)
1243  {
1244  (void) grow_memtuples(state);
1245  Assert(state->memtupcount < state->memtupsize);
1246  }
1247  state->memtuples[state->memtupcount++] = *tuple;
1248 
1249  /*
1250  * Check if it's time to switch over to a bounded heapsort. We do
1251  * so if the input tuple count exceeds twice the desired tuple
1252  * count (this is a heuristic for where heapsort becomes cheaper
1253  * than a quicksort), or if we've just filled workMem and have
1254  * enough tuples to meet the bound.
1255  *
1256  * Note that once we enter TSS_BOUNDED state we will always try to
1257  * complete the sort that way. In the worst case, if later input
1258  * tuples are larger than earlier ones, this might cause us to
1259  * exceed workMem significantly.
1260  */
1261  if (state->bounded &&
1262  (state->memtupcount > state->bound * 2 ||
1263  (state->memtupcount > state->bound && LACKMEM(state))))
1264  {
1265 #ifdef TRACE_SORT
1266  if (trace_sort)
1267  elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1268  state->memtupcount,
1269  pg_rusage_show(&state->ru_start));
1270 #endif
1272  MemoryContextSwitchTo(oldcontext);
1273  return;
1274  }
1275 
1276  /*
1277  * Done if we still fit in available memory and have array slots.
1278  */
1279  if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1280  {
1281  MemoryContextSwitchTo(oldcontext);
1282  return;
1283  }
1284 
1285  /*
1286  * Nope; time to switch to tape-based operation.
1287  */
1288  inittapes(state, true);
1289 
1290  /*
1291  * Dump all tuples.
1292  */
1293  dumptuples(state, false);
1294  break;
1295 
1296  case TSS_BOUNDED:
1297 
1298  /*
1299  * We don't want to grow the array here, so check whether the new
1300  * tuple can be discarded before putting it in. This should be a
1301  * good speed optimization, too, since when there are many more
1302  * input tuples than the bound, most input tuples can be discarded
1303  * with just this one comparison. Note that because we currently
1304  * have the sort direction reversed, we must check for <= not >=.
1305  */
1306  if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1307  {
1308  /* new tuple <= top of the heap, so we can discard it */
1309  free_sort_tuple(state, tuple);
1311  }
1312  else
1313  {
1314  /* discard top of heap, replacing it with the new tuple */
1315  free_sort_tuple(state, &state->memtuples[0]);
1317  }
1318  break;
1319 
1320  case TSS_BUILDRUNS:
1321 
1322  /*
1323  * Save the tuple into the unsorted array (there must be space)
1324  */
1325  state->memtuples[state->memtupcount++] = *tuple;
1326 
1327  /*
1328  * If we are over the memory limit, dump all tuples.
1329  */
1330  dumptuples(state, false);
1331  break;
1332 
1333  default:
1334  elog(ERROR, "invalid tuplesort state");
1335  break;
1336  }
1337  MemoryContextSwitchTo(oldcontext);
1338 }
1339 
1340 static bool
1342 {
1343  Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1344  Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1345  Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1346 
1347  /*
1348  * Check effectiveness of abbreviation optimization. Consider aborting
1349  * when still within memory limit.
1350  */
1351  if (state->status == TSS_INITIAL &&
1352  state->memtupcount >= state->abbrevNext)
1353  {
1354  state->abbrevNext *= 2;
1355 
1356  /*
1357  * Check opclass-supplied abbreviation abort routine. It may indicate
1358  * that abbreviation should not proceed.
1359  */
1360  if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1361  state->base.sortKeys))
1362  return false;
1363 
1364  /*
1365  * Finally, restore authoritative comparator, and indicate that
1366  * abbreviation is not in play by setting abbrev_converter to NULL
1367  */
1368  state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1369  state->base.sortKeys[0].abbrev_converter = NULL;
1370  /* Not strictly necessary, but be tidy */
1371  state->base.sortKeys[0].abbrev_abort = NULL;
1372  state->base.sortKeys[0].abbrev_full_comparator = NULL;
1373 
1374  /* Give up - expect original pass-by-value representation */
1375  return true;
1376  }
1377 
1378  return false;
1379 }
1380 
1381 /*
1382  * All tuples have been provided; finish the sort.
1383  */
1384 void
1386 {
1387  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1388 
1389 #ifdef TRACE_SORT
1390  if (trace_sort)
1391  elog(LOG, "performsort of worker %d starting: %s",
1392  state->worker, pg_rusage_show(&state->ru_start));
1393 #endif
1394 
1395  switch (state->status)
1396  {
1397  case TSS_INITIAL:
1398 
1399  /*
1400  * We were able to accumulate all the tuples within the allowed
1401  * amount of memory, or leader to take over worker tapes
1402  */
1403  if (SERIAL(state))
1404  {
1405  /* Just qsort 'em and we're done */
1407  state->status = TSS_SORTEDINMEM;
1408  }
1409  else if (WORKER(state))
1410  {
1411  /*
1412  * Parallel workers must still dump out tuples to tape. No
1413  * merge is required to produce single output run, though.
1414  */
1415  inittapes(state, false);
1416  dumptuples(state, true);
1418  state->status = TSS_SORTEDONTAPE;
1419  }
1420  else
1421  {
1422  /*
1423  * Leader will take over worker tapes and merge worker runs.
1424  * Note that mergeruns sets the correct state->status.
1425  */
1427  mergeruns(state);
1428  }
1429  state->current = 0;
1430  state->eof_reached = false;
1431  state->markpos_block = 0L;
1432  state->markpos_offset = 0;
1433  state->markpos_eof = false;
1434  break;
1435 
1436  case TSS_BOUNDED:
1437 
1438  /*
1439  * We were able to accumulate all the tuples required for output
1440  * in memory, using a heap to eliminate excess tuples. Now we
1441  * have to transform the heap to a properly-sorted array. Note
1442  * that sort_bounded_heap sets the correct state->status.
1443  */
1445  state->current = 0;
1446  state->eof_reached = false;
1447  state->markpos_offset = 0;
1448  state->markpos_eof = false;
1449  break;
1450 
1451  case TSS_BUILDRUNS:
1452 
1453  /*
1454  * Finish tape-based sort. First, flush all tuples remaining in
1455  * memory out to tape; then merge until we have a single remaining
1456  * run (or, if !randomAccess and !WORKER(), one run per tape).
1457  * Note that mergeruns sets the correct state->status.
1458  */
1459  dumptuples(state, true);
1460  mergeruns(state);
1461  state->eof_reached = false;
1462  state->markpos_block = 0L;
1463  state->markpos_offset = 0;
1464  state->markpos_eof = false;
1465  break;
1466 
1467  default:
1468  elog(ERROR, "invalid tuplesort state");
1469  break;
1470  }
1471 
1472 #ifdef TRACE_SORT
1473  if (trace_sort)
1474  {
1475  if (state->status == TSS_FINALMERGE)
1476  elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1477  state->worker, state->nInputTapes,
1478  pg_rusage_show(&state->ru_start));
1479  else
1480  elog(LOG, "performsort of worker %d done: %s",
1481  state->worker, pg_rusage_show(&state->ru_start));
1482  }
1483 #endif
1484 
1485  MemoryContextSwitchTo(oldcontext);
1486 }
1487 
1488 /*
1489  * Internal routine to fetch the next tuple in either forward or back
1490  * direction into *stup. Returns false if no more tuples.
1491  * Returned tuple belongs to tuplesort memory context, and must not be freed
1492  * by caller. Note that fetched tuple is stored in memory that may be
1493  * recycled by any future fetch.
1494  */
1495 bool
1497  SortTuple *stup)
1498 {
1499  unsigned int tuplen;
1500  size_t nmoved;
1501 
1502  Assert(!WORKER(state));
1503 
1504  switch (state->status)
1505  {
1506  case TSS_SORTEDINMEM:
1507  Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1508  Assert(!state->slabAllocatorUsed);
1509  if (forward)
1510  {
1511  if (state->current < state->memtupcount)
1512  {
1513  *stup = state->memtuples[state->current++];
1514  return true;
1515  }
1516  state->eof_reached = true;
1517 
1518  /*
1519  * Complain if caller tries to retrieve more tuples than
1520  * originally asked for in a bounded sort. This is because
1521  * returning EOF here might be the wrong thing.
1522  */
1523  if (state->bounded && state->current >= state->bound)
1524  elog(ERROR, "retrieved too many tuples in a bounded sort");
1525 
1526  return false;
1527  }
1528  else
1529  {
1530  if (state->current <= 0)
1531  return false;
1532 
1533  /*
1534  * if all tuples are fetched already then we return last
1535  * tuple, else - tuple before last returned.
1536  */
1537  if (state->eof_reached)
1538  state->eof_reached = false;
1539  else
1540  {
1541  state->current--; /* last returned tuple */
1542  if (state->current <= 0)
1543  return false;
1544  }
1545  *stup = state->memtuples[state->current - 1];
1546  return true;
1547  }
1548  break;
1549 
1550  case TSS_SORTEDONTAPE:
1551  Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1552  Assert(state->slabAllocatorUsed);
1553 
1554  /*
1555  * The slot that held the tuple that we returned in previous
1556  * gettuple call can now be reused.
1557  */
1558  if (state->lastReturnedTuple)
1559  {
1560  RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1561  state->lastReturnedTuple = NULL;
1562  }
1563 
1564  if (forward)
1565  {
1566  if (state->eof_reached)
1567  return false;
1568 
1569  if ((tuplen = getlen(state->result_tape, true)) != 0)
1570  {
1571  READTUP(state, stup, state->result_tape, tuplen);
1572 
1573  /*
1574  * Remember the tuple we return, so that we can recycle
1575  * its memory on next call. (This can be NULL, in the
1576  * !state->tuples case).
1577  */
1578  state->lastReturnedTuple = stup->tuple;
1579 
1580  return true;
1581  }
1582  else
1583  {
1584  state->eof_reached = true;
1585  return false;
1586  }
1587  }
1588 
1589  /*
1590  * Backward.
1591  *
1592  * if all tuples are fetched already then we return last tuple,
1593  * else - tuple before last returned.
1594  */
1595  if (state->eof_reached)
1596  {
1597  /*
1598  * Seek position is pointing just past the zero tuplen at the
1599  * end of file; back up to fetch last tuple's ending length
1600  * word. If seek fails we must have a completely empty file.
1601  */
1602  nmoved = LogicalTapeBackspace(state->result_tape,
1603  2 * sizeof(unsigned int));
1604  if (nmoved == 0)
1605  return false;
1606  else if (nmoved != 2 * sizeof(unsigned int))
1607  elog(ERROR, "unexpected tape position");
1608  state->eof_reached = false;
1609  }
1610  else
1611  {
1612  /*
1613  * Back up and fetch previously-returned tuple's ending length
1614  * word. If seek fails, assume we are at start of file.
1615  */
1616  nmoved = LogicalTapeBackspace(state->result_tape,
1617  sizeof(unsigned int));
1618  if (nmoved == 0)
1619  return false;
1620  else if (nmoved != sizeof(unsigned int))
1621  elog(ERROR, "unexpected tape position");
1622  tuplen = getlen(state->result_tape, false);
1623 
1624  /*
1625  * Back up to get ending length word of tuple before it.
1626  */
1627  nmoved = LogicalTapeBackspace(state->result_tape,
1628  tuplen + 2 * sizeof(unsigned int));
1629  if (nmoved == tuplen + sizeof(unsigned int))
1630  {
1631  /*
1632  * We backed up over the previous tuple, but there was no
1633  * ending length word before it. That means that the prev
1634  * tuple is the first tuple in the file. It is now the
1635  * next to read in forward direction (not obviously right,
1636  * but that is what in-memory case does).
1637  */
1638  return false;
1639  }
1640  else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1641  elog(ERROR, "bogus tuple length in backward scan");
1642  }
1643 
1644  tuplen = getlen(state->result_tape, false);
1645 
1646  /*
1647  * Now we have the length of the prior tuple, back up and read it.
1648  * Note: READTUP expects we are positioned after the initial
1649  * length word of the tuple, so back up to that point.
1650  */
1651  nmoved = LogicalTapeBackspace(state->result_tape,
1652  tuplen);
1653  if (nmoved != tuplen)
1654  elog(ERROR, "bogus tuple length in backward scan");
1655  READTUP(state, stup, state->result_tape, tuplen);
1656 
1657  /*
1658  * Remember the tuple we return, so that we can recycle its memory
1659  * on next call. (This can be NULL, in the Datum case).
1660  */
1661  state->lastReturnedTuple = stup->tuple;
1662 
1663  return true;
1664 
1665  case TSS_FINALMERGE:
1666  Assert(forward);
1667  /* We are managing memory ourselves, with the slab allocator. */
1668  Assert(state->slabAllocatorUsed);
1669 
1670  /*
1671  * The slab slot holding the tuple that we returned in previous
1672  * gettuple call can now be reused.
1673  */
1674  if (state->lastReturnedTuple)
1675  {
1676  RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1677  state->lastReturnedTuple = NULL;
1678  }
1679 
1680  /*
1681  * This code should match the inner loop of mergeonerun().
1682  */
1683  if (state->memtupcount > 0)
1684  {
1685  int srcTapeIndex = state->memtuples[0].srctape;
1686  LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1687  SortTuple newtup;
1688 
1689  *stup = state->memtuples[0];
1690 
1691  /*
1692  * Remember the tuple we return, so that we can recycle its
1693  * memory on next call. (This can be NULL, in the Datum case).
1694  */
1695  state->lastReturnedTuple = stup->tuple;
1696 
1697  /*
1698  * Pull next tuple from tape, and replace the returned tuple
1699  * at top of the heap with it.
1700  */
1701  if (!mergereadnext(state, srcTape, &newtup))
1702  {
1703  /*
1704  * If no more data, we've reached end of run on this tape.
1705  * Remove the top node from the heap.
1706  */
1708  state->nInputRuns--;
1709 
1710  /*
1711  * Close the tape. It'd go away at the end of the sort
1712  * anyway, but better to release the memory early.
1713  */
1714  LogicalTapeClose(srcTape);
1715  return true;
1716  }
1717  newtup.srctape = srcTapeIndex;
1719  return true;
1720  }
1721  return false;
1722 
1723  default:
1724  elog(ERROR, "invalid tuplesort state");
1725  return false; /* keep compiler quiet */
1726  }
1727 }
1728 
1729 
1730 /*
1731  * Advance over N tuples in either forward or back direction,
1732  * without returning any data. N==0 is a no-op.
1733  * Returns true if successful, false if ran out of tuples.
1734  */
1735 bool
1736 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1737 {
1738  MemoryContext oldcontext;
1739 
1740  /*
1741  * We don't actually support backwards skip yet, because no callers need
1742  * it. The API is designed to allow for that later, though.
1743  */
1744  Assert(forward);
1745  Assert(ntuples >= 0);
1746  Assert(!WORKER(state));
1747 
1748  switch (state->status)
1749  {
1750  case TSS_SORTEDINMEM:
1751  if (state->memtupcount - state->current >= ntuples)
1752  {
1753  state->current += ntuples;
1754  return true;
1755  }
1756  state->current = state->memtupcount;
1757  state->eof_reached = true;
1758 
1759  /*
1760  * Complain if caller tries to retrieve more tuples than
1761  * originally asked for in a bounded sort. This is because
1762  * returning EOF here might be the wrong thing.
1763  */
1764  if (state->bounded && state->current >= state->bound)
1765  elog(ERROR, "retrieved too many tuples in a bounded sort");
1766 
1767  return false;
1768 
1769  case TSS_SORTEDONTAPE:
1770  case TSS_FINALMERGE:
1771 
1772  /*
1773  * We could probably optimize these cases better, but for now it's
1774  * not worth the trouble.
1775  */
1776  oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1777  while (ntuples-- > 0)
1778  {
1779  SortTuple stup;
1780 
1781  if (!tuplesort_gettuple_common(state, forward, &stup))
1782  {
1783  MemoryContextSwitchTo(oldcontext);
1784  return false;
1785  }
1787  }
1788  MemoryContextSwitchTo(oldcontext);
1789  return true;
1790 
1791  default:
1792  elog(ERROR, "invalid tuplesort state");
1793  return false; /* keep compiler quiet */
1794  }
1795 }
1796 
1797 /*
1798  * tuplesort_merge_order - report merge order we'll use for given memory
1799  * (note: "merge order" just means the number of input tapes in the merge).
1800  *
1801  * This is exported for use by the planner. allowedMem is in bytes.
1802  */
1803 int
1804 tuplesort_merge_order(int64 allowedMem)
1805 {
1806  int mOrder;
1807 
1808  /*----------
1809  * In the merge phase, we need buffer space for each input and output tape.
1810  * Each pass in the balanced merge algorithm reads from M input tapes, and
1811  * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1812  * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1813  * input tape.
1814  *
1815  * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1816  * N * TAPE_BUFFER_OVERHEAD
1817  *
1818  * Except for the last and next-to-last merge passes, where there can be
1819  * fewer tapes left to process, M = N. We choose M so that we have the
1820  * desired amount of memory available for the input buffers
1821  * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1822  * available for the tape buffers (allowedMem).
1823  *
1824  * Note: you might be thinking we need to account for the memtuples[]
1825  * array in this calculation, but we effectively treat that as part of the
1826  * MERGE_BUFFER_SIZE workspace.
1827  *----------
1828  */
1829  mOrder = allowedMem /
1831 
1832  /*
1833  * Even in minimum memory, use at least a MINORDER merge. On the other
1834  * hand, even when we have lots of memory, do not use more than a MAXORDER
1835  * merge. Tapes are pretty cheap, but they're not entirely free. Each
1836  * additional tape reduces the amount of memory available to build runs,
1837  * which in turn can cause the same sort to need more runs, which makes
1838  * merging slower even if it can still be done in a single pass. Also,
1839  * high order merges are quite slow due to CPU cache effects; it can be
1840  * faster to pay the I/O cost of a multi-pass merge than to perform a
1841  * single merge pass across many hundreds of tapes.
1842  */
1843  mOrder = Max(mOrder, MINORDER);
1844  mOrder = Min(mOrder, MAXORDER);
1845 
1846  return mOrder;
1847 }
1848 
1849 /*
1850  * Helper function to calculate how much memory to allocate for the read buffer
1851  * of each input tape in a merge pass.
1852  *
1853  * 'avail_mem' is the amount of memory available for the buffers of all the
1854  * tapes, both input and output.
1855  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1856  * 'maxOutputTapes' is the max. number of output tapes we should produce.
1857  */
1858 static int64
1859 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1860  int maxOutputTapes)
1861 {
1862  int nOutputRuns;
1863  int nOutputTapes;
1864 
1865  /*
1866  * How many output tapes will we produce in this pass?
1867  *
1868  * This is nInputRuns / nInputTapes, rounded up.
1869  */
1870  nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1871 
1872  nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1873 
1874  /*
1875  * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1876  * remaining memory is divided evenly between the input tapes.
1877  *
1878  * This also follows from the formula in tuplesort_merge_order, but here
1879  * we derive the input buffer size from the amount of memory available,
1880  * and M and N.
1881  */
1882  return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1883 }
1884 
1885 /*
1886  * inittapes - initialize for tape sorting.
1887  *
1888  * This is called only if we have found we won't sort in memory.
1889  */
1890 static void
1892 {
1893  Assert(!LEADER(state));
1894 
1895  if (mergeruns)
1896  {
1897  /* Compute number of input tapes to use when merging */
1898  state->maxTapes = tuplesort_merge_order(state->allowedMem);
1899  }
1900  else
1901  {
1902  /* Workers can sometimes produce single run, output without merge */
1903  Assert(WORKER(state));
1904  state->maxTapes = MINORDER;
1905  }
1906 
1907 #ifdef TRACE_SORT
1908  if (trace_sort)
1909  elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1910  state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1911 #endif
1912 
1913  /* Create the tape set */
1914  inittapestate(state, state->maxTapes);
1915  state->tapeset =
1916  LogicalTapeSetCreate(false,
1917  state->shared ? &state->shared->fileset : NULL,
1918  state->worker);
1919 
1920  state->currentRun = 0;
1921 
1922  /*
1923  * Initialize logical tape arrays.
1924  */
1925  state->inputTapes = NULL;
1926  state->nInputTapes = 0;
1927  state->nInputRuns = 0;
1928 
1929  state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1930  state->nOutputTapes = 0;
1931  state->nOutputRuns = 0;
1932 
1933  state->status = TSS_BUILDRUNS;
1934 
1936 }
1937 
1938 /*
1939  * inittapestate - initialize generic tape management state
1940  */
1941 static void
1943 {
1944  int64 tapeSpace;
1945 
1946  /*
1947  * Decrease availMem to reflect the space needed for tape buffers; but
1948  * don't decrease it to the point that we have no room for tuples. (That
1949  * case is only likely to occur if sorting pass-by-value Datums; in all
1950  * other scenarios the memtuples[] array is unlikely to occupy more than
1951  * half of allowedMem. In the pass-by-value case it's not important to
1952  * account for tuple space, so we don't care if LACKMEM becomes
1953  * inaccurate.)
1954  */
1955  tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1956 
1957  if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1958  USEMEM(state, tapeSpace);
1959 
1960  /*
1961  * Make sure that the temp file(s) underlying the tape set are created in
1962  * suitable temp tablespaces. For parallel sorts, this should have been
1963  * called already, but it doesn't matter if it is called a second time.
1964  */
1966 }
1967 
1968 /*
1969  * selectnewtape -- select next tape to output to.
1970  *
1971  * This is called after finishing a run when we know another run
1972  * must be started. This is used both when building the initial
1973  * runs, and during merge passes.
1974  */
1975 static void
1977 {
1978  /*
1979  * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1980  * both zero. On each call, we create a new output tape to hold the next
1981  * run, until maxTapes is reached. After that, we assign new runs to the
1982  * existing tapes in a round robin fashion.
1983  */
1984  if (state->nOutputTapes < state->maxTapes)
1985  {
1986  /* Create a new tape to hold the next run */
1987  Assert(state->outputTapes[state->nOutputRuns] == NULL);
1988  Assert(state->nOutputRuns == state->nOutputTapes);
1989  state->destTape = LogicalTapeCreate(state->tapeset);
1990  state->outputTapes[state->nOutputTapes] = state->destTape;
1991  state->nOutputTapes++;
1992  state->nOutputRuns++;
1993  }
1994  else
1995  {
1996  /*
1997  * We have reached the max number of tapes. Append to an existing
1998  * tape.
1999  */
2000  state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
2001  state->nOutputRuns++;
2002  }
2003 }
2004 
2005 /*
2006  * Initialize the slab allocation arena, for the given number of slots.
2007  */
2008 static void
2010 {
2011  if (numSlots > 0)
2012  {
2013  char *p;
2014  int i;
2015 
2016  state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2017  state->slabMemoryEnd = state->slabMemoryBegin +
2018  numSlots * SLAB_SLOT_SIZE;
2019  state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2020  USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2021 
2022  p = state->slabMemoryBegin;
2023  for (i = 0; i < numSlots - 1; i++)
2024  {
2025  ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2026  p += SLAB_SLOT_SIZE;
2027  }
2028  ((SlabSlot *) p)->nextfree = NULL;
2029  }
2030  else
2031  {
2032  state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2033  state->slabFreeHead = NULL;
2034  }
2035  state->slabAllocatorUsed = true;
2036 }
2037 
2038 /*
2039  * mergeruns -- merge all the completed initial runs.
2040  *
2041  * This implements the Balanced k-Way Merge Algorithm. All input data has
2042  * already been written to initial runs on tape (see dumptuples).
2043  */
2044 static void
2046 {
2047  int tapenum;
2048 
2049  Assert(state->status == TSS_BUILDRUNS);
2050  Assert(state->memtupcount == 0);
2051 
2052  if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2053  {
2054  /*
2055  * If there are multiple runs to be merged, when we go to read back
2056  * tuples from disk, abbreviated keys will not have been stored, and
2057  * we don't care to regenerate them. Disable abbreviation from this
2058  * point on.
2059  */
2060  state->base.sortKeys->abbrev_converter = NULL;
2061  state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2062 
2063  /* Not strictly necessary, but be tidy */
2064  state->base.sortKeys->abbrev_abort = NULL;
2065  state->base.sortKeys->abbrev_full_comparator = NULL;
2066  }
2067 
2068  /*
2069  * Reset tuple memory. We've freed all the tuples that we previously
2070  * allocated. We will use the slab allocator from now on.
2071  */
2072  MemoryContextResetOnly(state->base.tuplecontext);
2073 
2074  /*
2075  * We no longer need a large memtuples array. (We will allocate a smaller
2076  * one for the heap later.)
2077  */
2078  FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2079  pfree(state->memtuples);
2080  state->memtuples = NULL;
2081 
2082  /*
2083  * Initialize the slab allocator. We need one slab slot per input tape,
2084  * for the tuples in the heap, plus one to hold the tuple last returned
2085  * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2086  * however, we don't need to do allocate anything.)
2087  *
2088  * In a multi-pass merge, we could shrink this allocation for the last
2089  * merge pass, if it has fewer tapes than previous passes, but we don't
2090  * bother.
2091  *
2092  * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2093  * to track memory usage of individual tuples.
2094  */
2095  if (state->base.tuples)
2096  init_slab_allocator(state, state->nOutputTapes + 1);
2097  else
2099 
2100  /*
2101  * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2102  * from each input tape.
2103  *
2104  * We could shrink this, too, between passes in a multi-pass merge, but we
2105  * don't bother. (The initial input tapes are still in outputTapes. The
2106  * number of input tapes will not increase between passes.)
2107  */
2108  state->memtupsize = state->nOutputTapes;
2109  state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2110  state->nOutputTapes * sizeof(SortTuple));
2111  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2112 
2113  /*
2114  * Use all the remaining memory we have available for tape buffers among
2115  * all the input tapes. At the beginning of each merge pass, we will
2116  * divide this memory between the input and output tapes in the pass.
2117  */
2118  state->tape_buffer_mem = state->availMem;
2119  USEMEM(state, state->tape_buffer_mem);
2120 #ifdef TRACE_SORT
2121  if (trace_sort)
2122  elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2123  state->worker, state->tape_buffer_mem / 1024);
2124 #endif
2125 
2126  for (;;)
2127  {
2128  /*
2129  * On the first iteration, or if we have read all the runs from the
2130  * input tapes in a multi-pass merge, it's time to start a new pass.
2131  * Rewind all the output tapes, and make them inputs for the next
2132  * pass.
2133  */
2134  if (state->nInputRuns == 0)
2135  {
2136  int64 input_buffer_size;
2137 
2138  /* Close the old, emptied, input tapes */
2139  if (state->nInputTapes > 0)
2140  {
2141  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2142  LogicalTapeClose(state->inputTapes[tapenum]);
2143  pfree(state->inputTapes);
2144  }
2145 
2146  /* Previous pass's outputs become next pass's inputs. */
2147  state->inputTapes = state->outputTapes;
2148  state->nInputTapes = state->nOutputTapes;
2149  state->nInputRuns = state->nOutputRuns;
2150 
2151  /*
2152  * Reset output tape variables. The actual LogicalTapes will be
2153  * created as needed, here we only allocate the array to hold
2154  * them.
2155  */
2156  state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2157  state->nOutputTapes = 0;
2158  state->nOutputRuns = 0;
2159 
2160  /*
2161  * Redistribute the memory allocated for tape buffers, among the
2162  * new input and output tapes.
2163  */
2164  input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2165  state->nInputTapes,
2166  state->nInputRuns,
2167  state->maxTapes);
2168 
2169 #ifdef TRACE_SORT
2170  if (trace_sort)
2171  elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2172  state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2173  pg_rusage_show(&state->ru_start));
2174 #endif
2175 
2176  /* Prepare the new input tapes for merge pass. */
2177  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2178  LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2179 
2180  /*
2181  * If there's just one run left on each input tape, then only one
2182  * merge pass remains. If we don't have to produce a materialized
2183  * sorted tape, we can stop at this point and do the final merge
2184  * on-the-fly.
2185  */
2186  if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2187  && state->nInputRuns <= state->nInputTapes
2188  && !WORKER(state))
2189  {
2190  /* Tell logtape.c we won't be writing anymore */
2192  /* Initialize for the final merge pass */
2193  beginmerge(state);
2194  state->status = TSS_FINALMERGE;
2195  return;
2196  }
2197  }
2198 
2199  /* Select an output tape */
2201 
2202  /* Merge one run from each input tape. */
2203  mergeonerun(state);
2204 
2205  /*
2206  * If the input tapes are empty, and we output only one output run,
2207  * we're done. The current output tape contains the final result.
2208  */
2209  if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2210  break;
2211  }
2212 
2213  /*
2214  * Done. The result is on a single run on a single tape.
2215  */
2216  state->result_tape = state->outputTapes[0];
2217  if (!WORKER(state))
2218  LogicalTapeFreeze(state->result_tape, NULL);
2219  else
2221  state->status = TSS_SORTEDONTAPE;
2222 
2223  /* Close all the now-empty input tapes, to release their read buffers. */
2224  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2225  LogicalTapeClose(state->inputTapes[tapenum]);
2226 }
2227 
2228 /*
2229  * Merge one run from each input tape.
2230  */
2231 static void
2233 {
2234  int srcTapeIndex;
2235  LogicalTape *srcTape;
2236 
2237  /*
2238  * Start the merge by loading one tuple from each active source tape into
2239  * the heap.
2240  */
2241  beginmerge(state);
2242 
2243  Assert(state->slabAllocatorUsed);
2244 
2245  /*
2246  * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2247  * out, and replacing it with next tuple from same tape (if there is
2248  * another one).
2249  */
2250  while (state->memtupcount > 0)
2251  {
2252  SortTuple stup;
2253 
2254  /* write the tuple to destTape */
2255  srcTapeIndex = state->memtuples[0].srctape;
2256  srcTape = state->inputTapes[srcTapeIndex];
2257  WRITETUP(state, state->destTape, &state->memtuples[0]);
2258 
2259  /* recycle the slot of the tuple we just wrote out, for the next read */
2260  if (state->memtuples[0].tuple)
2261  RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2262 
2263  /*
2264  * pull next tuple from the tape, and replace the written-out tuple in
2265  * the heap with it.
2266  */
2267  if (mergereadnext(state, srcTape, &stup))
2268  {
2269  stup.srctape = srcTapeIndex;
2271  }
2272  else
2273  {
2275  state->nInputRuns--;
2276  }
2277  }
2278 
2279  /*
2280  * When the heap empties, we're done. Write an end-of-run marker on the
2281  * output tape.
2282  */
2283  markrunend(state->destTape);
2284 }
2285 
2286 /*
2287  * beginmerge - initialize for a merge pass
2288  *
2289  * Fill the merge heap with the first tuple from each input tape.
2290  */
2291 static void
2293 {
2294  int activeTapes;
2295  int srcTapeIndex;
2296 
2297  /* Heap should be empty here */
2298  Assert(state->memtupcount == 0);
2299 
2300  activeTapes = Min(state->nInputTapes, state->nInputRuns);
2301 
2302  for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2303  {
2304  SortTuple tup;
2305 
2306  if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2307  {
2308  tup.srctape = srcTapeIndex;
2310  }
2311  }
2312 }
2313 
2314 /*
2315  * mergereadnext - read next tuple from one merge input tape
2316  *
2317  * Returns false on EOF.
2318  */
2319 static bool
2321 {
2322  unsigned int tuplen;
2323 
2324  /* read next tuple, if any */
2325  if ((tuplen = getlen(srcTape, true)) == 0)
2326  return false;
2327  READTUP(state, stup, srcTape, tuplen);
2328 
2329  return true;
2330 }
2331 
2332 /*
2333  * dumptuples - remove tuples from memtuples and write initial run to tape
2334  *
2335  * When alltuples = true, dump everything currently in memory. (This case is
2336  * only used at end of input data.)
2337  */
2338 static void
2340 {
2341  int memtupwrite;
2342  int i;
2343 
2344  /*
2345  * Nothing to do if we still fit in available memory and have array slots,
2346  * unless this is the final call during initial run generation.
2347  */
2348  if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2349  !alltuples)
2350  return;
2351 
2352  /*
2353  * Final call might require no sorting, in rare cases where we just so
2354  * happen to have previously LACKMEM()'d at the point where exactly all
2355  * remaining tuples are loaded into memory, just before input was
2356  * exhausted. In general, short final runs are quite possible, but avoid
2357  * creating a completely empty run. In a worker, though, we must produce
2358  * at least one tape, even if it's empty.
2359  */
2360  if (state->memtupcount == 0 && state->currentRun > 0)
2361  return;
2362 
2363  Assert(state->status == TSS_BUILDRUNS);
2364 
2365  /*
2366  * It seems unlikely that this limit will ever be exceeded, but take no
2367  * chances
2368  */
2369  if (state->currentRun == INT_MAX)
2370  ereport(ERROR,
2371  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2372  errmsg("cannot have more than %d runs for an external sort",
2373  INT_MAX)));
2374 
2375  if (state->currentRun > 0)
2377 
2378  state->currentRun++;
2379 
2380 #ifdef TRACE_SORT
2381  if (trace_sort)
2382  elog(LOG, "worker %d starting quicksort of run %d: %s",
2383  state->worker, state->currentRun,
2384  pg_rusage_show(&state->ru_start));
2385 #endif
2386 
2387  /*
2388  * Sort all tuples accumulated within the allowed amount of memory for
2389  * this run using quicksort
2390  */
2392 
2393 #ifdef TRACE_SORT
2394  if (trace_sort)
2395  elog(LOG, "worker %d finished quicksort of run %d: %s",
2396  state->worker, state->currentRun,
2397  pg_rusage_show(&state->ru_start));
2398 #endif
2399 
2400  memtupwrite = state->memtupcount;
2401  for (i = 0; i < memtupwrite; i++)
2402  {
2403  SortTuple *stup = &state->memtuples[i];
2404 
2405  WRITETUP(state, state->destTape, stup);
2406  }
2407 
2408  state->memtupcount = 0;
2409 
2410  /*
2411  * Reset tuple memory. We've freed all of the tuples that we previously
2412  * allocated. It's important to avoid fragmentation when there is a stark
2413  * change in the sizes of incoming tuples. In bounded sorts,
2414  * fragmentation due to AllocSetFree's bucketing by size class might be
2415  * particularly bad if this step wasn't taken.
2416  */
2417  MemoryContextReset(state->base.tuplecontext);
2418 
2419  /*
2420  * Now update the memory accounting to subtract the memory used by the
2421  * tuple.
2422  */
2423  FREEMEM(state, state->tupleMem);
2424  state->tupleMem = 0;
2425 
2426  markrunend(state->destTape);
2427 
2428 #ifdef TRACE_SORT
2429  if (trace_sort)
2430  elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2431  state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2432  pg_rusage_show(&state->ru_start));
2433 #endif
2434 }
2435 
2436 /*
2437  * tuplesort_rescan - rewind and replay the scan
2438  */
2439 void
2441 {
2442  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2443 
2444  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2445 
2446  switch (state->status)
2447  {
2448  case TSS_SORTEDINMEM:
2449  state->current = 0;
2450  state->eof_reached = false;
2451  state->markpos_offset = 0;
2452  state->markpos_eof = false;
2453  break;
2454  case TSS_SORTEDONTAPE:
2455  LogicalTapeRewindForRead(state->result_tape, 0);
2456  state->eof_reached = false;
2457  state->markpos_block = 0L;
2458  state->markpos_offset = 0;
2459  state->markpos_eof = false;
2460  break;
2461  default:
2462  elog(ERROR, "invalid tuplesort state");
2463  break;
2464  }
2465 
2466  MemoryContextSwitchTo(oldcontext);
2467 }
2468 
2469 /*
2470  * tuplesort_markpos - saves current position in the merged sort file
2471  */
2472 void
2474 {
2475  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2476 
2477  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2478 
2479  switch (state->status)
2480  {
2481  case TSS_SORTEDINMEM:
2482  state->markpos_offset = state->current;
2483  state->markpos_eof = state->eof_reached;
2484  break;
2485  case TSS_SORTEDONTAPE:
2486  LogicalTapeTell(state->result_tape,
2487  &state->markpos_block,
2488  &state->markpos_offset);
2489  state->markpos_eof = state->eof_reached;
2490  break;
2491  default:
2492  elog(ERROR, "invalid tuplesort state");
2493  break;
2494  }
2495 
2496  MemoryContextSwitchTo(oldcontext);
2497 }
2498 
2499 /*
2500  * tuplesort_restorepos - restores current position in merged sort file to
2501  * last saved position
2502  */
2503 void
2505 {
2506  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2507 
2508  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2509 
2510  switch (state->status)
2511  {
2512  case TSS_SORTEDINMEM:
2513  state->current = state->markpos_offset;
2514  state->eof_reached = state->markpos_eof;
2515  break;
2516  case TSS_SORTEDONTAPE:
2517  LogicalTapeSeek(state->result_tape,
2518  state->markpos_block,
2519  state->markpos_offset);
2520  state->eof_reached = state->markpos_eof;
2521  break;
2522  default:
2523  elog(ERROR, "invalid tuplesort state");
2524  break;
2525  }
2526 
2527  MemoryContextSwitchTo(oldcontext);
2528 }
2529 
2530 /*
2531  * tuplesort_get_stats - extract summary statistics
2532  *
2533  * This can be called after tuplesort_performsort() finishes to obtain
2534  * printable summary information about how the sort was performed.
2535  */
2536 void
2538  TuplesortInstrumentation *stats)
2539 {
2540  /*
2541  * Note: it might seem we should provide both memory and disk usage for a
2542  * disk-based sort. However, the current code doesn't track memory space
2543  * accurately once we have begun to return tuples to the caller (since we
2544  * don't account for pfree's the caller is expected to do), so we cannot
2545  * rely on availMem in a disk sort. This does not seem worth the overhead
2546  * to fix. Is it worth creating an API for the memory context code to
2547  * tell us how much is actually used in sortcontext?
2548  */
2550 
2551  if (state->isMaxSpaceDisk)
2553  else
2555  stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2556 
2557  switch (state->maxSpaceStatus)
2558  {
2559  case TSS_SORTEDINMEM:
2560  if (state->boundUsed)
2562  else
2564  break;
2565  case TSS_SORTEDONTAPE:
2567  break;
2568  case TSS_FINALMERGE:
2570  break;
2571  default:
2573  break;
2574  }
2575 }
2576 
2577 /*
2578  * Convert TuplesortMethod to a string.
2579  */
2580 const char *
2582 {
2583  switch (m)
2584  {
2586  return "still in progress";
2588  return "top-N heapsort";
2589  case SORT_TYPE_QUICKSORT:
2590  return "quicksort";
2592  return "external sort";
2594  return "external merge";
2595  }
2596 
2597  return "unknown";
2598 }
2599 
2600 /*
2601  * Convert TuplesortSpaceType to a string.
2602  */
2603 const char *
2605 {
2607  return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2608 }
2609 
2610 
2611 /*
2612  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2613  */
2614 
2615 /*
2616  * Convert the existing unordered array of SortTuples to a bounded heap,
2617  * discarding all but the smallest "state->bound" tuples.
2618  *
2619  * When working with a bounded heap, we want to keep the largest entry
2620  * at the root (array entry zero), instead of the smallest as in the normal
2621  * sort case. This allows us to discard the largest entry cheaply.
2622  * Therefore, we temporarily reverse the sort direction.
2623  */
2624 static void
2626 {
2627  int tupcount = state->memtupcount;
2628  int i;
2629 
2630  Assert(state->status == TSS_INITIAL);
2631  Assert(state->bounded);
2632  Assert(tupcount >= state->bound);
2633  Assert(SERIAL(state));
2634 
2635  /* Reverse sort direction so largest entry will be at root */
2637 
2638  state->memtupcount = 0; /* make the heap empty */
2639  for (i = 0; i < tupcount; i++)
2640  {
2641  if (state->memtupcount < state->bound)
2642  {
2643  /* Insert next tuple into heap */
2644  /* Must copy source tuple to avoid possible overwrite */
2645  SortTuple stup = state->memtuples[i];
2646 
2647  tuplesort_heap_insert(state, &stup);
2648  }
2649  else
2650  {
2651  /*
2652  * The heap is full. Replace the largest entry with the new
2653  * tuple, or just discard it, if it's larger than anything already
2654  * in the heap.
2655  */
2656  if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2657  {
2658  free_sort_tuple(state, &state->memtuples[i]);
2660  }
2661  else
2662  tuplesort_heap_replace_top(state, &state->memtuples[i]);
2663  }
2664  }
2665 
2666  Assert(state->memtupcount == state->bound);
2667  state->status = TSS_BOUNDED;
2668 }
2669 
2670 /*
2671  * Convert the bounded heap to a properly-sorted array
2672  */
2673 static void
2675 {
2676  int tupcount = state->memtupcount;
2677 
2678  Assert(state->status == TSS_BOUNDED);
2679  Assert(state->bounded);
2680  Assert(tupcount == state->bound);
2681  Assert(SERIAL(state));
2682 
2683  /*
2684  * We can unheapify in place because each delete-top call will remove the
2685  * largest entry, which we can promptly store in the newly freed slot at
2686  * the end. Once we're down to a single-entry heap, we're done.
2687  */
2688  while (state->memtupcount > 1)
2689  {
2690  SortTuple stup = state->memtuples[0];
2691 
2692  /* this sifts-up the next-largest entry and decreases memtupcount */
2694  state->memtuples[state->memtupcount] = stup;
2695  }
2696  state->memtupcount = tupcount;
2697 
2698  /*
2699  * Reverse sort direction back to the original state. This is not
2700  * actually necessary but seems like a good idea for tidiness.
2701  */
2703 
2704  state->status = TSS_SORTEDINMEM;
2705  state->boundUsed = true;
2706 }
2707 
2708 /*
2709  * Sort all memtuples using specialized qsort() routines.
2710  *
2711  * Quicksort is used for small in-memory sorts, and external sort runs.
2712  */
2713 static void
2715 {
2716  Assert(!LEADER(state));
2717 
2718  if (state->memtupcount > 1)
2719  {
2720  /*
2721  * Do we have the leading column's value or abbreviation in datum1,
2722  * and is there a specialization for its comparator?
2723  */
2724  if (state->base.haveDatum1 && state->base.sortKeys)
2725  {
2726  if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2727  {
2728  qsort_tuple_unsigned(state->memtuples,
2729  state->memtupcount,
2730  state);
2731  return;
2732  }
2733 #if SIZEOF_DATUM >= 8
2734  else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2735  {
2736  qsort_tuple_signed(state->memtuples,
2737  state->memtupcount,
2738  state);
2739  return;
2740  }
2741 #endif
2742  else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2743  {
2744  qsort_tuple_int32(state->memtuples,
2745  state->memtupcount,
2746  state);
2747  return;
2748  }
2749  }
2750 
2751  /* Can we use the single-key sort function? */
2752  if (state->base.onlyKey != NULL)
2753  {
2754  qsort_ssup(state->memtuples, state->memtupcount,
2755  state->base.onlyKey);
2756  }
2757  else
2758  {
2759  qsort_tuple(state->memtuples,
2760  state->memtupcount,
2761  state->base.comparetup,
2762  state);
2763  }
2764  }
2765 }
2766 
2767 /*
2768  * Insert a new tuple into an empty or existing heap, maintaining the
2769  * heap invariant. Caller is responsible for ensuring there's room.
2770  *
2771  * Note: For some callers, tuple points to a memtuples[] entry above the
2772  * end of the heap. This is safe as long as it's not immediately adjacent
2773  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2774  * is, it might get overwritten before being moved into the heap!
2775  */
2776 static void
2778 {
2779  SortTuple *memtuples;
2780  int j;
2781 
2782  memtuples = state->memtuples;
2783  Assert(state->memtupcount < state->memtupsize);
2784 
2786 
2787  /*
2788  * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2789  * using 1-based array indexes, not 0-based.
2790  */
2791  j = state->memtupcount++;
2792  while (j > 0)
2793  {
2794  int i = (j - 1) >> 1;
2795 
2796  if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2797  break;
2798  memtuples[j] = memtuples[i];
2799  j = i;
2800  }
2801  memtuples[j] = *tuple;
2802 }
2803 
2804 /*
2805  * Remove the tuple at state->memtuples[0] from the heap. Decrement
2806  * memtupcount, and sift up to maintain the heap invariant.
2807  *
2808  * The caller has already free'd the tuple the top node points to,
2809  * if necessary.
2810  */
2811 static void
2813 {
2814  SortTuple *memtuples = state->memtuples;
2815  SortTuple *tuple;
2816 
2817  if (--state->memtupcount <= 0)
2818  return;
2819 
2820  /*
2821  * Remove the last tuple in the heap, and re-insert it, by replacing the
2822  * current top node with it.
2823  */
2824  tuple = &memtuples[state->memtupcount];
2826 }
2827 
2828 /*
2829  * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2830  * maintain the heap invariant.
2831  *
2832  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2833  * Heapsort, steps H3-H8).
2834  */
2835 static void
2837 {
2838  SortTuple *memtuples = state->memtuples;
2839  unsigned int i,
2840  n;
2841 
2842  Assert(state->memtupcount >= 1);
2843 
2845 
2846  /*
2847  * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2848  * This prevents overflow in the "2 * i + 1" calculation, since at the top
2849  * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2850  */
2851  n = state->memtupcount;
2852  i = 0; /* i is where the "hole" is */
2853  for (;;)
2854  {
2855  unsigned int j = 2 * i + 1;
2856 
2857  if (j >= n)
2858  break;
2859  if (j + 1 < n &&
2860  COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2861  j++;
2862  if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2863  break;
2864  memtuples[i] = memtuples[j];
2865  i = j;
2866  }
2867  memtuples[i] = *tuple;
2868 }
2869 
2870 /*
2871  * Function to reverse the sort direction from its current state
2872  *
2873  * It is not safe to call this when performing hash tuplesorts
2874  */
2875 static void
2877 {
2878  SortSupport sortKey = state->base.sortKeys;
2879  int nkey;
2880 
2881  for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2882  {
2883  sortKey->ssup_reverse = !sortKey->ssup_reverse;
2884  sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2885  }
2886 }
2887 
2888 
2889 /*
2890  * Tape interface routines
2891  */
2892 
2893 static unsigned int
2894 getlen(LogicalTape *tape, bool eofOK)
2895 {
2896  unsigned int len;
2897 
2898  if (LogicalTapeRead(tape,
2899  &len, sizeof(len)) != sizeof(len))
2900  elog(ERROR, "unexpected end of tape");
2901  if (len == 0 && !eofOK)
2902  elog(ERROR, "unexpected end of data");
2903  return len;
2904 }
2905 
2906 static void
2908 {
2909  unsigned int len = 0;
2910 
2911  LogicalTapeWrite(tape, &len, sizeof(len));
2912 }
2913 
2914 /*
2915  * Get memory for tuple from within READTUP() routine.
2916  *
2917  * We use next free slot from the slab allocator, or palloc() if the tuple
2918  * is too large for that.
2919  */
2920 void *
2922 {
2923  SlabSlot *buf;
2924 
2925  /*
2926  * We pre-allocate enough slots in the slab arena that we should never run
2927  * out.
2928  */
2929  Assert(state->slabFreeHead);
2930 
2931  if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2932  return MemoryContextAlloc(state->base.sortcontext, tuplen);
2933  else
2934  {
2935  buf = state->slabFreeHead;
2936  /* Reuse this slot */
2937  state->slabFreeHead = buf->nextfree;
2938 
2939  return buf;
2940  }
2941 }
2942 
2943 
2944 /*
2945  * Parallel sort routines
2946  */
2947 
2948 /*
2949  * tuplesort_estimate_shared - estimate required shared memory allocation
2950  *
2951  * nWorkers is an estimate of the number of workers (it's the number that
2952  * will be requested).
2953  */
2954 Size
2956 {
2957  Size tapesSize;
2958 
2959  Assert(nWorkers > 0);
2960 
2961  /* Make sure that BufFile shared state is MAXALIGN'd */
2962  tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2963  tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2964 
2965  return tapesSize;
2966 }
2967 
2968 /*
2969  * tuplesort_initialize_shared - initialize shared tuplesort state
2970  *
2971  * Must be called from leader process before workers are launched, to
2972  * establish state needed up-front for worker tuplesortstates. nWorkers
2973  * should match the argument passed to tuplesort_estimate_shared().
2974  */
2975 void
2977 {
2978  int i;
2979 
2980  Assert(nWorkers > 0);
2981 
2982  SpinLockInit(&shared->mutex);
2983  shared->currentWorker = 0;
2984  shared->workersFinished = 0;
2985  SharedFileSetInit(&shared->fileset, seg);
2986  shared->nTapes = nWorkers;
2987  for (i = 0; i < nWorkers; i++)
2988  {
2989  shared->tapes[i].firstblocknumber = 0L;
2990  }
2991 }
2992 
2993 /*
2994  * tuplesort_attach_shared - attach to shared tuplesort state
2995  *
2996  * Must be called by all worker processes.
2997  */
2998 void
3000 {
3001  /* Attach to SharedFileSet */
3002  SharedFileSetAttach(&shared->fileset, seg);
3003 }
3004 
3005 /*
3006  * worker_get_identifier - Assign and return ordinal identifier for worker
3007  *
3008  * The order in which these are assigned is not well defined, and should not
3009  * matter; worker numbers across parallel sort participants need only be
3010  * distinct and gapless. logtape.c requires this.
3011  *
3012  * Note that the identifiers assigned from here have no relation to
3013  * ParallelWorkerNumber number, to avoid making any assumption about
3014  * caller's requirements. However, we do follow the ParallelWorkerNumber
3015  * convention of representing a non-worker with worker number -1. This
3016  * includes the leader, as well as serial Tuplesort processes.
3017  */
3018 static int
3020 {
3021  Sharedsort *shared = state->shared;
3022  int worker;
3023 
3024  Assert(WORKER(state));
3025 
3026  SpinLockAcquire(&shared->mutex);
3027  worker = shared->currentWorker++;
3028  SpinLockRelease(&shared->mutex);
3029 
3030  return worker;
3031 }
3032 
3033 /*
3034  * worker_freeze_result_tape - freeze worker's result tape for leader
3035  *
3036  * This is called by workers just after the result tape has been determined,
3037  * instead of calling LogicalTapeFreeze() directly. They do so because
3038  * workers require a few additional steps over similar serial
3039  * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3040  * steps are around freeing now unneeded resources, and representing to
3041  * leader that worker's input run is available for its merge.
3042  *
3043  * There should only be one final output run for each worker, which consists
3044  * of all tuples that were originally input into worker.
3045  */
3046 static void
3048 {
3049  Sharedsort *shared = state->shared;
3050  TapeShare output;
3051 
3052  Assert(WORKER(state));
3053  Assert(state->result_tape != NULL);
3054  Assert(state->memtupcount == 0);
3055 
3056  /*
3057  * Free most remaining memory, in case caller is sensitive to our holding
3058  * on to it. memtuples may not be a tiny merge heap at this point.
3059  */
3060  pfree(state->memtuples);
3061  /* Be tidy */
3062  state->memtuples = NULL;
3063  state->memtupsize = 0;
3064 
3065  /*
3066  * Parallel worker requires result tape metadata, which is to be stored in
3067  * shared memory for leader
3068  */
3069  LogicalTapeFreeze(state->result_tape, &output);
3070 
3071  /* Store properties of output tape, and update finished worker count */
3072  SpinLockAcquire(&shared->mutex);
3073  shared->tapes[state->worker] = output;
3074  shared->workersFinished++;
3075  SpinLockRelease(&shared->mutex);
3076 }
3077 
3078 /*
3079  * worker_nomergeruns - dump memtuples in worker, without merging
3080  *
3081  * This called as an alternative to mergeruns() with a worker when no
3082  * merging is required.
3083  */
3084 static void
3086 {
3087  Assert(WORKER(state));
3088  Assert(state->result_tape == NULL);
3089  Assert(state->nOutputRuns == 1);
3090 
3091  state->result_tape = state->destTape;
3093 }
3094 
3095 /*
3096  * leader_takeover_tapes - create tapeset for leader from worker tapes
3097  *
3098  * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3099  * sorting has occurred in workers, all of which must have already returned
3100  * from tuplesort_performsort().
3101  *
3102  * When this returns, leader process is left in a state that is virtually
3103  * indistinguishable from it having generated runs as a serial external sort
3104  * might have.
3105  */
3106 static void
3108 {
3109  Sharedsort *shared = state->shared;
3110  int nParticipants = state->nParticipants;
3111  int workersFinished;
3112  int j;
3113 
3114  Assert(LEADER(state));
3115  Assert(nParticipants >= 1);
3116 
3117  SpinLockAcquire(&shared->mutex);
3118  workersFinished = shared->workersFinished;
3119  SpinLockRelease(&shared->mutex);
3120 
3121  if (nParticipants != workersFinished)
3122  elog(ERROR, "cannot take over tapes before all workers finish");
3123 
3124  /*
3125  * Create the tapeset from worker tapes, including a leader-owned tape at
3126  * the end. Parallel workers are far more expensive than logical tapes,
3127  * so the number of tapes allocated here should never be excessive.
3128  */
3129  inittapestate(state, nParticipants);
3130  state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3131 
3132  /*
3133  * Set currentRun to reflect the number of runs we will merge (it's not
3134  * used for anything, this is just pro forma)
3135  */
3136  state->currentRun = nParticipants;
3137 
3138  /*
3139  * Initialize the state to look the same as after building the initial
3140  * runs.
3141  *
3142  * There will always be exactly 1 run per worker, and exactly one input
3143  * tape per run, because workers always output exactly 1 run, even when
3144  * there were no input tuples for workers to sort.
3145  */
3146  state->inputTapes = NULL;
3147  state->nInputTapes = 0;
3148  state->nInputRuns = 0;
3149 
3150  state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3151  state->nOutputTapes = nParticipants;
3152  state->nOutputRuns = nParticipants;
3153 
3154  for (j = 0; j < nParticipants; j++)
3155  {
3156  state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3157  }
3158 
3159  state->status = TSS_BUILDRUNS;
3160 }
3161 
3162 /*
3163  * Convenience routine to free a tuple previously loaded into sort memory
3164  */
3165 static void
3167 {
3168  if (stup->tuple)
3169  {
3171  pfree(stup->tuple);
3172  stup->tuple = NULL;
3173  }
3174 }
3175 
3176 int
3178 {
3179  if (x < y)
3180  return -1;
3181  else if (x > y)
3182  return 1;
3183  else
3184  return 0;
3185 }
3186 
3187 #if SIZEOF_DATUM >= 8
3188 int
3189 ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3190 {
3191  int64 xx = DatumGetInt64(x);
3192  int64 yy = DatumGetInt64(y);
3193 
3194  if (xx < yy)
3195  return -1;
3196  else if (xx > yy)
3197  return 1;
3198  else
3199  return 0;
3200 }
3201 #endif
3202 
3203 int
3205 {
3206  int32 xx = DatumGetInt32(x);
3207  int32 yy = DatumGetInt32(y);
3208 
3209  if (xx < yy)
3210  return -1;
3211  else if (xx > yy)
3212  return 1;
3213  else
3214  return 0;
3215 }
void PrepareTempTablespaces(void)
Definition: tablespace.c:1331
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: bump.c:131
#define Min(x, y)
Definition: c.h:1004
#define MAXALIGN(LEN)
Definition: c.h:811
signed int int32
Definition: c.h:494
#define Max(x, y)
Definition: c.h:998
#define INT64_FORMAT
Definition: c.h:548
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define pg_attribute_always_inline
Definition: c.h:234
size_t Size
Definition: c.h:605
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
FILE * output
int y
Definition: isn.c:72
int b
Definition: isn.c:70
int x
Definition: isn.c:71
int a
Definition: isn.c:69
int j
Definition: isn.c:74
int i
Definition: isn.c:73
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void pfree(void *pointer)
Definition: mcxt.c:1520
Size GetMemoryChunkSpace(void *pointer)
Definition: mcxt.c:721
void * palloc0(Size size)
Definition: mcxt.c:1346
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1180
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1316
void MemoryContextResetOnly(MemoryContext context)
Definition: mcxt.c:402
void * repalloc_huge(void *pointer, Size size)
Definition: mcxt.c:1671
#define AllocSetContextCreate
Definition: memutils.h:129
#define MaxAllocHugeSize
Definition: memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void size_t len
const char * pg_rusage_show(const PGRUsage *ru0)
Definition: pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition: pg_rusage.c:27
static char * buf
Definition: pg_test_fsync.c:73
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:385
uintptr_t Datum
Definition: postgres.h:64
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
MemoryContextSwitchTo(old_ctx)
int slock_t
Definition: s_lock.h:735
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:56
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:233
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:302
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedFileSet fileset
Definition: tuplesort.c:363
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: tuplesort.c:372
int workersFinished
Definition: tuplesort.c:360
int nTapes
Definition: tuplesort.c:366
slock_t mutex
Definition: tuplesort.c:349
int currentWorker
Definition: tuplesort.c:359
Sharedsort * sharedsort
Definition: tuplesort.h:58
bool ssup_nulls_first
Definition: sortsupport.h:75
void * tuple
Definition: tuplesort.h:149
int srctape
Definition: tuplesort.h:152
Datum datum1
Definition: tuplesort.h:150
int64 firstblocknumber
Definition: logtape.h:54
TuplesortMethod sortMethod
Definition: tuplesort.h:112
TuplesortSpaceType spaceType
Definition: tuplesort.h:113
void * lastReturnedTuple
Definition: tuplesort.c:265
LogicalTapeSet * tapeset
Definition: tuplesort.c:209
bool isMaxSpaceDisk
Definition: tuplesort.c:205
bool growmemtuples
Definition: tuplesort.c:221
SortTuple * memtuples
Definition: tuplesort.c:218
int64 maxSpace
Definition: tuplesort.c:203
LogicalTape ** inputTapes
Definition: tuplesort.c:281
bool slabAllocatorUsed
Definition: tuplesort.c:250
TuplesortPublic base
Definition: tuplesort.c:188
char * slabMemoryEnd
Definition: tuplesort.c:253
int64 tupleMem
Definition: tuplesort.c:194
PGRUsage ru_start
Definition: tuplesort.c:338
char * slabMemoryBegin
Definition: tuplesort.c:252
LogicalTape ** outputTapes
Definition: tuplesort.c:285
bool eof_reached
Definition: tuplesort.c:298
size_t tape_buffer_mem
Definition: tuplesort.c:257
TupSortStatus status
Definition: tuplesort.c:189
int64 availMem
Definition: tuplesort.c:199
LogicalTape * destTape
Definition: tuplesort.c:289
TupSortStatus maxSpaceStatus
Definition: tuplesort.c:208
bool markpos_eof
Definition: tuplesort.c:303
int64 abbrevNext
Definition: tuplesort.c:331
int64 markpos_block
Definition: tuplesort.c:301
Sharedsort * shared
Definition: tuplesort.c:322
LogicalTape * result_tape
Definition: tuplesort.c:296
SlabSlot * slabFreeHead
Definition: tuplesort.c:254
int markpos_offset
Definition: tuplesort.c:302
int64 allowedMem
Definition: tuplesort.c:200
Definition: regguts.h:323
void tuplesort_rescan(Tuplesortstate *state)
Definition: tuplesort.c:2440
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1385
int tuplesort_merge_order(int64 allowedMem)
Definition: tuplesort.c:1804
#define TAPE_BUFFER_OVERHEAD
Definition: tuplesort.c:179
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition: tuplesort.c:2812
#define INITIAL_MEMTUPSIZE
Definition: tuplesort.c:119
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition: tuplesort.c:2894
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition: tuplesort.c:2976
#define COMPARETUP(state, a, b)
Definition: tuplesort.c:399
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition: tuplesort.c:645
static void selectnewtape(Tuplesortstate *state)
Definition: tuplesort.c:1976
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1039
#define SERIAL(state)
Definition: tuplesort.c:406
#define FREESTATE(state)
Definition: tuplesort.c:402
static void markrunend(LogicalTape *tape)
Definition: tuplesort.c:2907
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition: tuplesort.c:1736
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition: tuplesort.c:3166
#define REMOVEABBREV(state, stup, count)
Definition: tuplesort.c:398
#define LACKMEM(state)
Definition: tuplesort.c:403
static void reversedirection(Tuplesortstate *state)
Definition: tuplesort.c:2876
#define USEMEM(state, amt)
Definition: tuplesort.c:404
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2777
static bool grow_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:1072
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3177
static void beginmerge(Tuplesortstate *state)
Definition: tuplesort.c:2292
static void make_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2625
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:891
#define WRITETUP(state, tape, stup)
Definition: tuplesort.c:400
static void sort_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2674
TupSortStatus
Definition: tuplesort.c:156
@ TSS_SORTEDONTAPE
Definition: tuplesort.c:161
@ TSS_SORTEDINMEM
Definition: tuplesort.c:160
@ TSS_INITIAL
Definition: tuplesort.c:157
@ TSS_FINALMERGE
Definition: tuplesort.c:162
@ TSS_BUILDRUNS
Definition: tuplesort.c:159
@ TSS_BOUNDED
Definition: tuplesort.c:158
static int worker_get_identifier(Tuplesortstate *state)
Definition: tuplesort.c:3019
static void mergeonerun(Tuplesortstate *state)
Definition: tuplesort.c:2232
#define FREEMEM(state, amt)
Definition: tuplesort.c:405
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition: tuplesort.c:2604
#define MAXORDER
Definition: tuplesort.c:178
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition: tuplesort.c:1942
#define SLAB_SLOT_SIZE
Definition: tuplesort.c:143
static void leader_takeover_tapes(Tuplesortstate *state)
Definition: tuplesort.c:3107
Size tuplesort_estimate_shared(int nWorkers)
Definition: tuplesort.c:2955
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:2537
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:2714
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:971
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition: tuplesort.c:1891
void tuplesort_markpos(Tuplesortstate *state)
Definition: tuplesort.c:2473
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition: tuplesort.c:1189
#define MERGE_BUFFER_SIZE
Definition: tuplesort.c:180
#define READTUP(state, stup, tape, len)
Definition: tuplesort.c:401
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3204
#define LEADER(state)
Definition: tuplesort.c:408
#define WORKER(state)
Definition: tuplesort.c:407
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition: tuplesort.c:1496
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition: tuplesort.c:1859
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition: tuplesort.c:2320
union SlabSlot SlabSlot
static void tuplesort_updatemax(Tuplesortstate *state)
Definition: tuplesort.c:988
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition: tuplesort.c:3047
bool trace_sort
Definition: tuplesort.c:124
#define RELEASE_SLAB_SLOT(state, tuple)
Definition: tuplesort.c:386
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition: tuplesort.c:2999
static void worker_nomergeruns(Tuplesortstate *state)
Definition: tuplesort.c:3085
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:498
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2836
void tuplesort_restorepos(Tuplesortstate *state)
Definition: tuplesort.c:2504
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:545
static void mergeruns(Tuplesortstate *state)
Definition: tuplesort.c:2045
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition: tuplesort.c:2921
#define MINORDER
Definition: tuplesort.c:177
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition: tuplesort.c:757
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:843
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition: tuplesort.c:2009
const char * tuplesort_method_name(TuplesortMethod m)
Definition: tuplesort.c:2581
static bool consider_abort_common(Tuplesortstate *state)
Definition: tuplesort.c:1341
static void tuplesort_free(Tuplesortstate *state)
Definition: tuplesort.c:902
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition: tuplesort.c:2339
#define TupleSortUseBumpTupleCxt(opt)
Definition: tuplesort.h:108
#define TUPLESORT_RANDOMACCESS
Definition: tuplesort.h:96
#define TUPLESORT_ALLOWBOUNDED
Definition: tuplesort.h:99
TuplesortSpaceType
Definition: tuplesort.h:87
@ SORT_SPACE_TYPE_DISK
Definition: tuplesort.h:88
@ SORT_SPACE_TYPE_MEMORY
Definition: tuplesort.h:89
TuplesortMethod
Definition: tuplesort.h:76
@ SORT_TYPE_EXTERNAL_SORT
Definition: tuplesort.h:80
@ SORT_TYPE_TOP_N_HEAPSORT
Definition: tuplesort.h:78
@ SORT_TYPE_QUICKSORT
Definition: tuplesort.h:79
@ SORT_TYPE_STILL_IN_PROGRESS
Definition: tuplesort.h:77
@ SORT_TYPE_EXTERNAL_MERGE
Definition: tuplesort.h:81
char buffer[SLAB_SLOT_SIZE]
Definition: tuplesort.c:148
union SlabSlot * nextfree
Definition: tuplesort.c:147