PostgreSQL Source Code  git master
tuplesort.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  * Generalized tuple sorting routines.
5  *
6  * This module provides a generalized facility for tuple sorting, which can be
7  * applied to different kinds of sortable objects. Implementation of
8  * the particular sorting variants is given in tuplesortvariants.c.
9  * This module works efficiently for both small and large amounts
10  * of data. Small amounts are sorted in-memory using qsort(). Large
11  * amounts are sorted using temporary files and a standard external sort
12  * algorithm.
13  *
14  * See Knuth, volume 3, for more than you want to know about external
15  * sorting algorithms. The algorithm we use is a balanced k-way merge.
16  * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
17  * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
18  * merge is better. Knuth is assuming that tape drives are expensive
19  * beasts, and in particular that there will always be many more runs than
20  * tape drives. The polyphase merge algorithm was good at keeping all the
21  * tape drives busy, but in our implementation a "tape drive" doesn't cost
22  * much more than a few Kb of memory buffers, so we can afford to have
23  * lots of them. In particular, if we can have as many tape drives as
24  * sorted runs, we can eliminate any repeated I/O at all.
25  *
26  * Historically, we divided the input into sorted runs using replacement
27  * selection, in the form of a priority tree implemented as a heap
28  * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
29  * for run generation.
30  *
31  * The approximate amount of memory allowed for any one sort operation
32  * is specified in kilobytes by the caller (most pass work_mem). Initially,
33  * we absorb tuples and simply store them in an unsorted array as long as
34  * we haven't exceeded workMem. If we reach the end of the input without
35  * exceeding workMem, we sort the array using qsort() and subsequently return
36  * tuples just by scanning the tuple array sequentially. If we do exceed
37  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
38  * When tuples are dumped in batch after quicksorting, we begin a new run
39  * with a new output tape. If we reach the max number of tapes, we write
40  * subsequent runs on the existing tapes in a round-robin fashion. We will
41  * need multiple merge passes to finish the merge in that case. After the
42  * end of the input is reached, we dump out remaining tuples in memory into
43  * a final run, then merge the runs.
44  *
45  * When merging runs, we use a heap containing just the frontmost tuple from
46  * each source run; we repeatedly output the smallest tuple and replace it
47  * with the next tuple from its source tape (if any). When the heap empties,
48  * the merge is complete. The basic merge algorithm thus needs very little
49  * memory --- only M tuples for an M-way merge, and M is constrained to a
50  * small number. However, we can still make good use of our full workMem
51  * allocation by pre-reading additional blocks from each source tape. Without
52  * prereading, our access pattern to the temporary file would be very erratic;
53  * on average we'd read one block from each of M source tapes during the same
54  * time that we're writing M blocks to the output tape, so there is no
55  * sequentiality of access at all, defeating the read-ahead methods used by
56  * most Unix kernels. Worse, the output tape gets written into a very random
57  * sequence of blocks of the temp file, ensuring that things will be even
58  * worse when it comes time to read that tape. A straightforward merge pass
59  * thus ends up doing a lot of waiting for disk seeks. We can improve matters
60  * by prereading from each source tape sequentially, loading about workMem/M
61  * bytes from each tape in turn, and making the sequential blocks immediately
62  * available for reuse. This approach helps to localize both read and write
63  * accesses. The pre-reading is handled by logtape.c, we just tell it how
64  * much memory to use for the buffers.
65  *
66  * In the current code we determine the number of input tapes M on the basis
67  * of workMem: we want workMem/M to be large enough that we read a fair
68  * amount of data each time we read from a tape, so as to maintain the
69  * locality of access described above. Nonetheless, with large workMem we
70  * can have many tapes. The logical "tapes" are implemented by logtape.c,
71  * which avoids space wastage by recycling disk space as soon as each block
72  * is read from its "tape".
73  *
74  * When the caller requests random access to the sort result, we form
75  * the final sorted run on a logical tape which is then "frozen", so
76  * that we can access it randomly. When the caller does not need random
77  * access, we return from tuplesort_performsort() as soon as we are down
78  * to one run per logical tape. The final merge is then performed
79  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
80  * saves one cycle of writing all the data out to disk and reading it in.
81  *
82  * This module supports parallel sorting. Parallel sorts involve coordination
83  * among one or more worker processes, and a leader process, each with its own
84  * tuplesort state. The leader process (or, more accurately, the
85  * Tuplesortstate associated with a leader process) creates a full tapeset
86  * consisting of worker tapes with one run to merge; a run for every
87  * worker process. This is then merged. Worker processes are guaranteed to
88  * produce exactly one output run from their partial input.
89  *
90  *
91  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
92  * Portions Copyright (c) 1994, Regents of the University of California
93  *
94  * IDENTIFICATION
95  * src/backend/utils/sort/tuplesort.c
96  *
97  *-------------------------------------------------------------------------
98  */
99 
100 #include "postgres.h"
101 
102 #include <limits.h>
103 
104 #include "commands/tablespace.h"
105 #include "miscadmin.h"
106 #include "pg_trace.h"
107 #include "storage/shmem.h"
108 #include "utils/guc.h"
109 #include "utils/memutils.h"
110 #include "utils/pg_rusage.h"
111 #include "utils/tuplesort.h"
112 
113 /*
114  * Initial size of memtuples array. We're trying to select this size so that
115  * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
116  * allocation might possibly be lowered. However, we don't consider array sizes
117  * less than 1024.
118  *
119  */
120 #define INITIAL_MEMTUPSIZE Max(1024, \
121  ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
122 
123 /* GUC variables */
124 #ifdef TRACE_SORT
125 bool trace_sort = false;
126 #endif
127 
128 #ifdef DEBUG_BOUNDED_SORT
129 bool optimize_bounded_sort = true;
130 #endif
131 
132 
133 /*
134  * During merge, we use a pre-allocated set of fixed-size slots to hold
135  * tuples. To avoid palloc/pfree overhead.
136  *
137  * Merge doesn't require a lot of memory, so we can afford to waste some,
138  * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
139  * palloc() overhead is not significant anymore.
140  *
141  * 'nextfree' is valid when this chunk is in the free list. When in use, the
142  * slot holds a tuple.
143  */
144 #define SLAB_SLOT_SIZE 1024
145 
146 typedef union SlabSlot
147 {
151 
152 /*
153  * Possible states of a Tuplesort object. These denote the states that
154  * persist between calls of Tuplesort routines.
155  */
156 typedef enum
157 {
158  TSS_INITIAL, /* Loading tuples; still within memory limit */
159  TSS_BOUNDED, /* Loading tuples into bounded-size heap */
160  TSS_BUILDRUNS, /* Loading tuples; writing to tape */
161  TSS_SORTEDINMEM, /* Sort completed entirely in memory */
162  TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
163  TSS_FINALMERGE, /* Performing final merge on-the-fly */
164 } TupSortStatus;
165 
166 /*
167  * Parameters for calculation of number of tapes to use --- see inittapes()
168  * and tuplesort_merge_order().
169  *
170  * In this calculation we assume that each tape will cost us about 1 blocks
171  * worth of buffer space. This ignores the overhead of all the other data
172  * structures needed for each tape, but it's probably close enough.
173  *
174  * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
175  * input tape, for pre-reading (see discussion at top of file). This is *in
176  * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
177  */
178 #define MINORDER 6 /* minimum merge order */
179 #define MAXORDER 500 /* maximum merge order */
180 #define TAPE_BUFFER_OVERHEAD BLCKSZ
181 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
182 
183 
184 /*
185  * Private state of a Tuplesort operation.
186  */
188 {
190  TupSortStatus status; /* enumerated value as shown above */
191  bool bounded; /* did caller specify a maximum number of
192  * tuples to return? */
193  bool boundUsed; /* true if we made use of a bounded heap */
194  int bound; /* if bounded, the maximum number of tuples */
195  int64 tupleMem; /* memory consumed by individual tuples.
196  * storing this separately from what we track
197  * in availMem allows us to subtract the
198  * memory consumed by all tuples when dumping
199  * tuples to tape */
200  int64 availMem; /* remaining memory available, in bytes */
201  int64 allowedMem; /* total memory allowed, in bytes */
202  int maxTapes; /* max number of input tapes to merge in each
203  * pass */
204  int64 maxSpace; /* maximum amount of space occupied among sort
205  * of groups, either in-memory or on-disk */
206  bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
207  * space, false when it's value for in-memory
208  * space */
209  TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
210  LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
211 
212  /*
213  * This array holds the tuples now in sort memory. If we are in state
214  * INITIAL, the tuples are in no particular order; if we are in state
215  * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
216  * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
217  * H. In state SORTEDONTAPE, the array is not used.
218  */
219  SortTuple *memtuples; /* array of SortTuple structs */
220  int memtupcount; /* number of tuples currently present */
221  int memtupsize; /* allocated length of memtuples array */
222  bool growmemtuples; /* memtuples' growth still underway? */
223 
224  /*
225  * Memory for tuples is sometimes allocated using a simple slab allocator,
226  * rather than with palloc(). Currently, we switch to slab allocation
227  * when we start merging. Merging only needs to keep a small, fixed
228  * number of tuples in memory at any time, so we can avoid the
229  * palloc/pfree overhead by recycling a fixed number of fixed-size slots
230  * to hold the tuples.
231  *
232  * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
233  * slots. The allocation is sized to have one slot per tape, plus one
234  * additional slot. We need that many slots to hold all the tuples kept
235  * in the heap during merge, plus the one we have last returned from the
236  * sort, with tuplesort_gettuple.
237  *
238  * Initially, all the slots are kept in a linked list of free slots. When
239  * a tuple is read from a tape, it is put to the next available slot, if
240  * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
241  * instead.
242  *
243  * When we're done processing a tuple, we return the slot back to the free
244  * list, or pfree() if it was palloc'd. We know that a tuple was
245  * allocated from the slab, if its pointer value is between
246  * slabMemoryBegin and -End.
247  *
248  * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
249  * tracking memory usage is not used.
250  */
252 
253  char *slabMemoryBegin; /* beginning of slab memory arena */
254  char *slabMemoryEnd; /* end of slab memory arena */
255  SlabSlot *slabFreeHead; /* head of free list */
256 
257  /* Memory used for input and output tape buffers. */
259 
260  /*
261  * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
262  * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
263  * modes), we remember the tuple in 'lastReturnedTuple', so that we can
264  * recycle the memory on next gettuple call.
265  */
267 
268  /*
269  * While building initial runs, this is the current output run number.
270  * Afterwards, it is the number of initial runs we made.
271  */
273 
274  /*
275  * Logical tapes, for merging.
276  *
277  * The initial runs are written in the output tapes. In each merge pass,
278  * the output tapes of the previous pass become the input tapes, and new
279  * output tapes are created as needed. When nInputTapes equals
280  * nInputRuns, there is only one merge pass left.
281  */
285 
289 
290  LogicalTape *destTape; /* current output tape */
291 
292  /*
293  * These variables are used after completion of sorting to keep track of
294  * the next tuple to return. (In the tape case, the tape's current read
295  * position is also critical state.)
296  */
297  LogicalTape *result_tape; /* actual tape of finished output */
298  int current; /* array index (only used if SORTEDINMEM) */
299  bool eof_reached; /* reached EOF (needed for cursors) */
300 
301  /* markpos_xxx holds marked position for mark and restore */
302  int64 markpos_block; /* tape block# (only used if SORTEDONTAPE) */
303  int markpos_offset; /* saved "current", or offset in tape block */
304  bool markpos_eof; /* saved "eof_reached" */
305 
306  /*
307  * These variables are used during parallel sorting.
308  *
309  * worker is our worker identifier. Follows the general convention that
310  * -1 value relates to a leader tuplesort, and values >= 0 worker
311  * tuplesorts. (-1 can also be a serial tuplesort.)
312  *
313  * shared is mutable shared memory state, which is used to coordinate
314  * parallel sorts.
315  *
316  * nParticipants is the number of worker Tuplesortstates known by the
317  * leader to have actually been launched, which implies that they must
318  * finish a run that the leader needs to merge. Typically includes a
319  * worker state held by the leader process itself. Set in the leader
320  * Tuplesortstate only.
321  */
322  int worker;
325 
326  /*
327  * Additional state for managing "abbreviated key" sortsupport routines
328  * (which currently may be used by all cases except the hash index case).
329  * Tracks the intervals at which the optimization's effectiveness is
330  * tested.
331  */
332  int64 abbrevNext; /* Tuple # at which to next check
333  * applicability */
334 
335  /*
336  * Resource snapshot for time of sort start.
337  */
338 #ifdef TRACE_SORT
340 #endif
341 };
342 
343 /*
344  * Private mutable state of tuplesort-parallel-operation. This is allocated
345  * in shared memory.
346  */
348 {
349  /* mutex protects all fields prior to tapes */
351 
352  /*
353  * currentWorker generates ordinal identifier numbers for parallel sort
354  * workers. These start from 0, and are always gapless.
355  *
356  * Workers increment workersFinished to indicate having finished. If this
357  * is equal to state.nParticipants within the leader, leader is ready to
358  * merge worker runs.
359  */
362 
363  /* Temporary file space */
365 
366  /* Size of tapes flexible array */
367  int nTapes;
368 
369  /*
370  * Tapes array used by workers to report back information needed by the
371  * leader to concatenate all worker tapes into one for merging
372  */
374 };
375 
376 /*
377  * Is the given tuple allocated from the slab memory arena?
378  */
379 #define IS_SLAB_SLOT(state, tuple) \
380  ((char *) (tuple) >= (state)->slabMemoryBegin && \
381  (char *) (tuple) < (state)->slabMemoryEnd)
382 
383 /*
384  * Return the given tuple to the slab memory free list, or free it
385  * if it was palloc'd.
386  */
387 #define RELEASE_SLAB_SLOT(state, tuple) \
388  do { \
389  SlabSlot *buf = (SlabSlot *) tuple; \
390  \
391  if (IS_SLAB_SLOT((state), buf)) \
392  { \
393  buf->nextfree = (state)->slabFreeHead; \
394  (state)->slabFreeHead = buf; \
395  } else \
396  pfree(buf); \
397  } while(0)
398 
399 #define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
400 #define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
401 #define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
402 #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
403 #define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
404 #define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
405 #define USEMEM(state,amt) ((state)->availMem -= (amt))
406 #define FREEMEM(state,amt) ((state)->availMem += (amt))
407 #define SERIAL(state) ((state)->shared == NULL)
408 #define WORKER(state) ((state)->shared && (state)->worker != -1)
409 #define LEADER(state) ((state)->shared && (state)->worker == -1)
410 
411 /*
412  * NOTES about on-tape representation of tuples:
413  *
414  * We require the first "unsigned int" of a stored tuple to be the total size
415  * on-tape of the tuple, including itself (so it is never zero; an all-zero
416  * unsigned int is used to delimit runs). The remainder of the stored tuple
417  * may or may not match the in-memory representation of the tuple ---
418  * any conversion needed is the job of the writetup and readtup routines.
419  *
420  * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
421  * representation of the tuple must be followed by another "unsigned int" that
422  * is a copy of the length --- so the total tape space used is actually
423  * sizeof(unsigned int) more than the stored length value. This allows
424  * read-backwards. When the random access flag was not specified, the
425  * write/read routines may omit the extra length word.
426  *
427  * writetup is expected to write both length words as well as the tuple
428  * data. When readtup is called, the tape is positioned just after the
429  * front length word; readtup must read the tuple data and advance past
430  * the back length word (if present).
431  *
432  * The write/read routines can make use of the tuple description data
433  * stored in the Tuplesortstate record, if needed. They are also expected
434  * to adjust state->availMem by the amount of memory space (not tape space!)
435  * released or consumed. There is no error return from either writetup
436  * or readtup; they should ereport() on failure.
437  *
438  *
439  * NOTES about memory consumption calculations:
440  *
441  * We count space allocated for tuples against the workMem limit, plus
442  * the space used by the variable-size memtuples array. Fixed-size space
443  * is not counted; it's small enough to not be interesting.
444  *
445  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
446  * rather than the originally-requested size. This is important since
447  * palloc can add substantial overhead. It's not a complete answer since
448  * we won't count any wasted space in palloc allocation blocks, but it's
449  * a lot better than what we were doing before 7.3. As of 9.6, a
450  * separate memory context is used for caller passed tuples. Resetting
451  * it at certain key increments significantly ameliorates fragmentation.
452  * readtup routines use the slab allocator (they cannot use
453  * the reset context because it gets deleted at the point that merging
454  * begins).
455  */
456 
457 
460 static void inittapes(Tuplesortstate *state, bool mergeruns);
461 static void inittapestate(Tuplesortstate *state, int maxTapes);
462 static void selectnewtape(Tuplesortstate *state);
463 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
464 static void mergeruns(Tuplesortstate *state);
465 static void mergeonerun(Tuplesortstate *state);
466 static void beginmerge(Tuplesortstate *state);
467 static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
468 static void dumptuples(Tuplesortstate *state, bool alltuples);
476 static unsigned int getlen(LogicalTape *tape, bool eofOK);
477 static void markrunend(LogicalTape *tape);
482 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
483 static void tuplesort_free(Tuplesortstate *state);
485 
486 /*
487  * Specialized comparators that we can inline into specialized sorts. The goal
488  * is to try to sort two tuples without having to follow the pointers to the
489  * comparator or the tuple.
490  *
491  * XXX: For now, there is no specialization for cases where datum1 is
492  * authoritative and we don't even need to fall back to a callback at all (that
493  * would be true for types like int4/int8/timestamp/date, but not true for
494  * abbreviations of text or multi-key sorts. There could be! Is it worth it?
495  */
496 
497 /* Used if first key's comparator is ssup_datum_unsigned_cmp */
500 {
501  int compare;
502 
503  compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
504  b->datum1, b->isnull1,
505  &state->base.sortKeys[0]);
506  if (compare != 0)
507  return compare;
508 
509  /*
510  * No need to waste effort calling the tiebreak function when there are no
511  * other keys to sort on.
512  */
513  if (state->base.onlyKey != NULL)
514  return 0;
515 
516  return state->base.comparetup_tiebreak(a, b, state);
517 }
518 
519 #if SIZEOF_DATUM >= 8
520 /* Used if first key's comparator is ssup_datum_signed_cmp */
522 qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
523 {
524  int compare;
525 
526  compare = ApplySignedSortComparator(a->datum1, a->isnull1,
527  b->datum1, b->isnull1,
528  &state->base.sortKeys[0]);
529 
530  if (compare != 0)
531  return compare;
532 
533  /*
534  * No need to waste effort calling the tiebreak function when there are no
535  * other keys to sort on.
536  */
537  if (state->base.onlyKey != NULL)
538  return 0;
539 
540  return state->base.comparetup_tiebreak(a, b, state);
541 }
542 #endif
543 
544 /* Used if first key's comparator is ssup_datum_int32_cmp */
547 {
548  int compare;
549 
550  compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
551  b->datum1, b->isnull1,
552  &state->base.sortKeys[0]);
553 
554  if (compare != 0)
555  return compare;
556 
557  /*
558  * No need to waste effort calling the tiebreak function when there are no
559  * other keys to sort on.
560  */
561  if (state->base.onlyKey != NULL)
562  return 0;
563 
564  return state->base.comparetup_tiebreak(a, b, state);
565 }
566 
567 /*
568  * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
569  * any variant of SortTuples, using the appropriate comparetup function.
570  * qsort_ssup() is specialized for the case where the comparetup function
571  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
572  * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
573  * common comparison functions on pass-by-value leading datums.
574  */
575 
576 #define ST_SORT qsort_tuple_unsigned
577 #define ST_ELEMENT_TYPE SortTuple
578 #define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
579 #define ST_COMPARE_ARG_TYPE Tuplesortstate
580 #define ST_CHECK_FOR_INTERRUPTS
581 #define ST_SCOPE static
582 #define ST_DEFINE
583 #include "lib/sort_template.h"
584 
585 #if SIZEOF_DATUM >= 8
586 #define ST_SORT qsort_tuple_signed
587 #define ST_ELEMENT_TYPE SortTuple
588 #define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
589 #define ST_COMPARE_ARG_TYPE Tuplesortstate
590 #define ST_CHECK_FOR_INTERRUPTS
591 #define ST_SCOPE static
592 #define ST_DEFINE
593 #include "lib/sort_template.h"
594 #endif
595 
596 #define ST_SORT qsort_tuple_int32
597 #define ST_ELEMENT_TYPE SortTuple
598 #define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
599 #define ST_COMPARE_ARG_TYPE Tuplesortstate
600 #define ST_CHECK_FOR_INTERRUPTS
601 #define ST_SCOPE static
602 #define ST_DEFINE
603 #include "lib/sort_template.h"
604 
605 #define ST_SORT qsort_tuple
606 #define ST_ELEMENT_TYPE SortTuple
607 #define ST_COMPARE_RUNTIME_POINTER
608 #define ST_COMPARE_ARG_TYPE Tuplesortstate
609 #define ST_CHECK_FOR_INTERRUPTS
610 #define ST_SCOPE static
611 #define ST_DECLARE
612 #define ST_DEFINE
613 #include "lib/sort_template.h"
614 
615 #define ST_SORT qsort_ssup
616 #define ST_ELEMENT_TYPE SortTuple
617 #define ST_COMPARE(a, b, ssup) \
618  ApplySortComparator((a)->datum1, (a)->isnull1, \
619  (b)->datum1, (b)->isnull1, (ssup))
620 #define ST_COMPARE_ARG_TYPE SortSupportData
621 #define ST_CHECK_FOR_INTERRUPTS
622 #define ST_SCOPE static
623 #define ST_DEFINE
624 #include "lib/sort_template.h"
625 
626 /*
627  * tuplesort_begin_xxx
628  *
629  * Initialize for a tuple sort operation.
630  *
631  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
632  * zero or more times, then call tuplesort_performsort when all the tuples
633  * have been supplied. After performsort, retrieve the tuples in sorted
634  * order by calling tuplesort_getXXX until it returns false/NULL. (If random
635  * access was requested, rescan, markpos, and restorepos can also be called.)
636  * Call tuplesort_end to terminate the operation and release memory/disk space.
637  *
638  * Each variant of tuplesort_begin has a workMem parameter specifying the
639  * maximum number of kilobytes of RAM to use before spilling data to disk.
640  * (The normal value of this parameter is work_mem, but some callers use
641  * other values.) Each variant also has a sortopt which is a bitmask of
642  * sort options. See TUPLESORT_* definitions in tuplesort.h
643  */
644 
646 tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
647 {
649  MemoryContext maincontext;
650  MemoryContext sortcontext;
651  MemoryContext oldcontext;
652 
653  /* See leader_takeover_tapes() remarks on random access support */
654  if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
655  elog(ERROR, "random access disallowed under parallel sort");
656 
657  /*
658  * Memory context surviving tuplesort_reset. This memory context holds
659  * data which is useful to keep while sorting multiple similar batches.
660  */
662  "TupleSort main",
664 
665  /*
666  * Create a working memory context for one sort operation. The content of
667  * this context is deleted by tuplesort_reset.
668  */
669  sortcontext = AllocSetContextCreate(maincontext,
670  "TupleSort sort",
672 
673  /*
674  * Additionally a working memory context for tuples is setup in
675  * tuplesort_begin_batch.
676  */
677 
678  /*
679  * Make the Tuplesortstate within the per-sortstate context. This way, we
680  * don't need a separate pfree() operation for it at shutdown.
681  */
682  oldcontext = MemoryContextSwitchTo(maincontext);
683 
685 
686 #ifdef TRACE_SORT
687  if (trace_sort)
688  pg_rusage_init(&state->ru_start);
689 #endif
690 
691  state->base.sortopt = sortopt;
692  state->base.tuples = true;
693  state->abbrevNext = 10;
694 
695  /*
696  * workMem is forced to be at least 64KB, the current minimum valid value
697  * for the work_mem GUC. This is a defense against parallel sort callers
698  * that divide out memory among many workers in a way that leaves each
699  * with very little memory.
700  */
701  state->allowedMem = Max(workMem, 64) * (int64) 1024;
702  state->base.sortcontext = sortcontext;
703  state->base.maincontext = maincontext;
704 
705  /*
706  * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
707  * see comments in grow_memtuples().
708  */
709  state->memtupsize = INITIAL_MEMTUPSIZE;
710  state->memtuples = NULL;
711 
712  /*
713  * After all of the other non-parallel-related state, we setup all of the
714  * state needed for each batch.
715  */
717 
718  /*
719  * Initialize parallel-related state based on coordination information
720  * from caller
721  */
722  if (!coordinate)
723  {
724  /* Serial sort */
725  state->shared = NULL;
726  state->worker = -1;
727  state->nParticipants = -1;
728  }
729  else if (coordinate->isWorker)
730  {
731  /* Parallel worker produces exactly one final run from all input */
732  state->shared = coordinate->sharedsort;
733  state->worker = worker_get_identifier(state);
734  state->nParticipants = -1;
735  }
736  else
737  {
738  /* Parallel leader state only used for final merge */
739  state->shared = coordinate->sharedsort;
740  state->worker = -1;
741  state->nParticipants = coordinate->nParticipants;
742  Assert(state->nParticipants >= 1);
743  }
744 
745  MemoryContextSwitchTo(oldcontext);
746 
747  return state;
748 }
749 
750 /*
751  * tuplesort_begin_batch
752  *
753  * Setup, or reset, all state need for processing a new set of tuples with this
754  * sort state. Called both from tuplesort_begin_common (the first time sorting
755  * with this sort state) and tuplesort_reset (for subsequent usages).
756  */
757 static void
759 {
760  MemoryContext oldcontext;
761 
762  oldcontext = MemoryContextSwitchTo(state->base.maincontext);
763 
764  /*
765  * Caller tuple (e.g. IndexTuple) memory context.
766  *
767  * A dedicated child context used exclusively for caller passed tuples
768  * eases memory management. Resetting at key points reduces
769  * fragmentation. Note that the memtuples array of SortTuples is allocated
770  * in the parent context, not this context, because there is no need to
771  * free memtuples early. For bounded sorts, tuples may be pfreed in any
772  * order, so we use a regular aset.c context so that it can make use of
773  * free'd memory. When the sort is not bounded, we make use of a bump.c
774  * context as this keeps allocations more compact with less wastage.
775  * Allocations are also slightly more CPU efficient.
776  */
777  if (TupleSortUseBumpTupleCxt(state->base.sortopt))
778  state->base.tuplecontext = BumpContextCreate(state->base.sortcontext,
779  "Caller tuples",
781  else
782  state->base.tuplecontext = AllocSetContextCreate(state->base.sortcontext,
783  "Caller tuples",
785 
786 
787  state->status = TSS_INITIAL;
788  state->bounded = false;
789  state->boundUsed = false;
790 
791  state->availMem = state->allowedMem;
792 
793  state->tapeset = NULL;
794 
795  state->memtupcount = 0;
796 
797  /*
798  * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
799  * see comments in grow_memtuples().
800  */
801  state->growmemtuples = true;
802  state->slabAllocatorUsed = false;
803  if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
804  {
805  pfree(state->memtuples);
806  state->memtuples = NULL;
807  state->memtupsize = INITIAL_MEMTUPSIZE;
808  }
809  if (state->memtuples == NULL)
810  {
811  state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
812  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
813  }
814 
815  /* workMem must be large enough for the minimal memtuples array */
816  if (LACKMEM(state))
817  elog(ERROR, "insufficient memory allowed for sort");
818 
819  state->currentRun = 0;
820 
821  /*
822  * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
823  * inittapes(), if needed.
824  */
825 
826  state->result_tape = NULL; /* flag that result tape has not been formed */
827 
828  MemoryContextSwitchTo(oldcontext);
829 }
830 
831 /*
832  * tuplesort_set_bound
833  *
834  * Advise tuplesort that at most the first N result tuples are required.
835  *
836  * Must be called before inserting any tuples. (Actually, we could allow it
837  * as long as the sort hasn't spilled to disk, but there seems no need for
838  * delayed calls at the moment.)
839  *
840  * This is a hint only. The tuplesort may still return more tuples than
841  * requested. Parallel leader tuplesorts will always ignore the hint.
842  */
843 void
845 {
846  /* Assert we're called before loading any tuples */
847  Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
848  /* Assert we allow bounded sorts */
849  Assert(state->base.sortopt & TUPLESORT_ALLOWBOUNDED);
850  /* Can't set the bound twice, either */
851  Assert(!state->bounded);
852  /* Also, this shouldn't be called in a parallel worker */
853  Assert(!WORKER(state));
854 
855  /* Parallel leader allows but ignores hint */
856  if (LEADER(state))
857  return;
858 
859 #ifdef DEBUG_BOUNDED_SORT
860  /* Honor GUC setting that disables the feature (for easy testing) */
861  if (!optimize_bounded_sort)
862  return;
863 #endif
864 
865  /* We want to be able to compute bound * 2, so limit the setting */
866  if (bound > (int64) (INT_MAX / 2))
867  return;
868 
869  state->bounded = true;
870  state->bound = (int) bound;
871 
872  /*
873  * Bounded sorts are not an effective target for abbreviated key
874  * optimization. Disable by setting state to be consistent with no
875  * abbreviation support.
876  */
877  state->base.sortKeys->abbrev_converter = NULL;
878  if (state->base.sortKeys->abbrev_full_comparator)
879  state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
880 
881  /* Not strictly necessary, but be tidy */
882  state->base.sortKeys->abbrev_abort = NULL;
883  state->base.sortKeys->abbrev_full_comparator = NULL;
884 }
885 
886 /*
887  * tuplesort_used_bound
888  *
889  * Allow callers to find out if the sort state was able to use a bound.
890  */
891 bool
893 {
894  return state->boundUsed;
895 }
896 
897 /*
898  * tuplesort_free
899  *
900  * Internal routine for freeing resources of tuplesort.
901  */
902 static void
904 {
905  /* context swap probably not needed, but let's be safe */
906  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
907 
908 #ifdef TRACE_SORT
909  int64 spaceUsed;
910 
911  if (state->tapeset)
912  spaceUsed = LogicalTapeSetBlocks(state->tapeset);
913  else
914  spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
915 #endif
916 
917  /*
918  * Delete temporary "tape" files, if any.
919  *
920  * Note: want to include this in reported total cost of sort, hence need
921  * for two #ifdef TRACE_SORT sections.
922  *
923  * We don't bother to destroy the individual tapes here. They will go away
924  * with the sortcontext. (In TSS_FINALMERGE state, we have closed
925  * finished tapes already.)
926  */
927  if (state->tapeset)
928  LogicalTapeSetClose(state->tapeset);
929 
930 #ifdef TRACE_SORT
931  if (trace_sort)
932  {
933  if (state->tapeset)
934  elog(LOG, "%s of worker %d ended, %lld disk blocks used: %s",
935  SERIAL(state) ? "external sort" : "parallel external sort",
936  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
937  else
938  elog(LOG, "%s of worker %d ended, %lld KB used: %s",
939  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
940  state->worker, (long long) spaceUsed, pg_rusage_show(&state->ru_start));
941  }
942 
943  TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
944 #else
945 
946  /*
947  * If you disabled TRACE_SORT, you can still probe sort__done, but you
948  * ain't getting space-used stats.
949  */
950  TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
951 #endif
952 
953  FREESTATE(state);
954  MemoryContextSwitchTo(oldcontext);
955 
956  /*
957  * Free the per-sort memory context, thereby releasing all working memory.
958  */
959  MemoryContextReset(state->base.sortcontext);
960 }
961 
962 /*
963  * tuplesort_end
964  *
965  * Release resources and clean up.
966  *
967  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
968  * pointing to garbage. Be careful not to attempt to use or free such
969  * pointers afterwards!
970  */
971 void
973 {
975 
976  /*
977  * Free the main memory context, including the Tuplesortstate struct
978  * itself.
979  */
980  MemoryContextDelete(state->base.maincontext);
981 }
982 
983 /*
984  * tuplesort_updatemax
985  *
986  * Update maximum resource usage statistics.
987  */
988 static void
990 {
991  int64 spaceUsed;
992  bool isSpaceDisk;
993 
994  /*
995  * Note: it might seem we should provide both memory and disk usage for a
996  * disk-based sort. However, the current code doesn't track memory space
997  * accurately once we have begun to return tuples to the caller (since we
998  * don't account for pfree's the caller is expected to do), so we cannot
999  * rely on availMem in a disk sort. This does not seem worth the overhead
1000  * to fix. Is it worth creating an API for the memory context code to
1001  * tell us how much is actually used in sortcontext?
1002  */
1003  if (state->tapeset)
1004  {
1005  isSpaceDisk = true;
1006  spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
1007  }
1008  else
1009  {
1010  isSpaceDisk = false;
1011  spaceUsed = state->allowedMem - state->availMem;
1012  }
1013 
1014  /*
1015  * Sort evicts data to the disk when it wasn't able to fit that data into
1016  * main memory. This is why we assume space used on the disk to be more
1017  * important for tracking resource usage than space used in memory. Note
1018  * that the amount of space occupied by some tupleset on the disk might be
1019  * less than amount of space occupied by the same tupleset in memory due
1020  * to more compact representation.
1021  */
1022  if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
1023  (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
1024  {
1025  state->maxSpace = spaceUsed;
1026  state->isMaxSpaceDisk = isSpaceDisk;
1027  state->maxSpaceStatus = state->status;
1028  }
1029 }
1030 
1031 /*
1032  * tuplesort_reset
1033  *
1034  * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
1035  * meta-information in. After tuplesort_reset, tuplesort is ready to start
1036  * a new sort. This allows avoiding recreation of tuple sort states (and
1037  * save resources) when sorting multiple small batches.
1038  */
1039 void
1041 {
1044 
1045  /*
1046  * After we've freed up per-batch memory, re-setup all of the state common
1047  * to both the first batch and any subsequent batch.
1048  */
1050 
1051  state->lastReturnedTuple = NULL;
1052  state->slabMemoryBegin = NULL;
1053  state->slabMemoryEnd = NULL;
1054  state->slabFreeHead = NULL;
1055 }
1056 
1057 /*
1058  * Grow the memtuples[] array, if possible within our memory constraint. We
1059  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1060  * limit. Return true if we were able to enlarge the array, false if not.
1061  *
1062  * Normally, at each increment we double the size of the array. When doing
1063  * that would exceed a limit, we attempt one last, smaller increase (and then
1064  * clear the growmemtuples flag so we don't try any more). That allows us to
1065  * use memory as fully as permitted; sticking to the pure doubling rule could
1066  * result in almost half going unused. Because availMem moves around with
1067  * tuple addition/removal, we need some rule to prevent making repeated small
1068  * increases in memtupsize, which would just be useless thrashing. The
1069  * growmemtuples flag accomplishes that and also prevents useless
1070  * recalculations in this function.
1071  */
1072 static bool
1074 {
1075  int newmemtupsize;
1076  int memtupsize = state->memtupsize;
1077  int64 memNowUsed = state->allowedMem - state->availMem;
1078 
1079  /* Forget it if we've already maxed out memtuples, per comment above */
1080  if (!state->growmemtuples)
1081  return false;
1082 
1083  /* Select new value of memtupsize */
1084  if (memNowUsed <= state->availMem)
1085  {
1086  /*
1087  * We've used no more than half of allowedMem; double our usage,
1088  * clamping at INT_MAX tuples.
1089  */
1090  if (memtupsize < INT_MAX / 2)
1091  newmemtupsize = memtupsize * 2;
1092  else
1093  {
1094  newmemtupsize = INT_MAX;
1095  state->growmemtuples = false;
1096  }
1097  }
1098  else
1099  {
1100  /*
1101  * This will be the last increment of memtupsize. Abandon doubling
1102  * strategy and instead increase as much as we safely can.
1103  *
1104  * To stay within allowedMem, we can't increase memtupsize by more
1105  * than availMem / sizeof(SortTuple) elements. In practice, we want
1106  * to increase it by considerably less, because we need to leave some
1107  * space for the tuples to which the new array slots will refer. We
1108  * assume the new tuples will be about the same size as the tuples
1109  * we've already seen, and thus we can extrapolate from the space
1110  * consumption so far to estimate an appropriate new size for the
1111  * memtuples array. The optimal value might be higher or lower than
1112  * this estimate, but it's hard to know that in advance. We again
1113  * clamp at INT_MAX tuples.
1114  *
1115  * This calculation is safe against enlarging the array so much that
1116  * LACKMEM becomes true, because the memory currently used includes
1117  * the present array; thus, there would be enough allowedMem for the
1118  * new array elements even if no other memory were currently used.
1119  *
1120  * We do the arithmetic in float8, because otherwise the product of
1121  * memtupsize and allowedMem could overflow. Any inaccuracy in the
1122  * result should be insignificant; but even if we computed a
1123  * completely insane result, the checks below will prevent anything
1124  * really bad from happening.
1125  */
1126  double grow_ratio;
1127 
1128  grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1129  if (memtupsize * grow_ratio < INT_MAX)
1130  newmemtupsize = (int) (memtupsize * grow_ratio);
1131  else
1132  newmemtupsize = INT_MAX;
1133 
1134  /* We won't make any further enlargement attempts */
1135  state->growmemtuples = false;
1136  }
1137 
1138  /* Must enlarge array by at least one element, else report failure */
1139  if (newmemtupsize <= memtupsize)
1140  goto noalloc;
1141 
1142  /*
1143  * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
1144  * to ensure our request won't be rejected. Note that we can easily
1145  * exhaust address space before facing this outcome. (This is presently
1146  * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1147  * don't rely on that at this distance.)
1148  */
1149  if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1150  {
1151  newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1152  state->growmemtuples = false; /* can't grow any more */
1153  }
1154 
1155  /*
1156  * We need to be sure that we do not cause LACKMEM to become true, else
1157  * the space management algorithm will go nuts. The code above should
1158  * never generate a dangerous request, but to be safe, check explicitly
1159  * that the array growth fits within availMem. (We could still cause
1160  * LACKMEM if the memory chunk overhead associated with the memtuples
1161  * array were to increase. That shouldn't happen because we chose the
1162  * initial array size large enough to ensure that palloc will be treating
1163  * both old and new arrays as separate chunks. But we'll check LACKMEM
1164  * explicitly below just in case.)
1165  */
1166  if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1167  goto noalloc;
1168 
1169  /* OK, do it */
1170  FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1171  state->memtupsize = newmemtupsize;
1172  state->memtuples = (SortTuple *)
1173  repalloc_huge(state->memtuples,
1174  state->memtupsize * sizeof(SortTuple));
1175  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1176  if (LACKMEM(state))
1177  elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1178  return true;
1179 
1180 noalloc:
1181  /* If for any reason we didn't realloc, shut off future attempts */
1182  state->growmemtuples = false;
1183  return false;
1184 }
1185 
1186 /*
1187  * Shared code for tuple and datum cases.
1188  */
1189 void
1191  bool useAbbrev, Size tuplen)
1192 {
1193  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1194 
1195  Assert(!LEADER(state));
1196 
1197  /* account for the memory used for this tuple */
1198  USEMEM(state, tuplen);
1199  state->tupleMem += tuplen;
1200 
1201  if (!useAbbrev)
1202  {
1203  /*
1204  * Leave ordinary Datum representation, or NULL value. If there is a
1205  * converter it won't expect NULL values, and cost model is not
1206  * required to account for NULL, so in that case we avoid calling
1207  * converter and just set datum1 to zeroed representation (to be
1208  * consistent, and to support cheap inequality tests for NULL
1209  * abbreviated keys).
1210  */
1211  }
1212  else if (!consider_abort_common(state))
1213  {
1214  /* Store abbreviated key representation */
1215  tuple->datum1 = state->base.sortKeys->abbrev_converter(tuple->datum1,
1216  state->base.sortKeys);
1217  }
1218  else
1219  {
1220  /*
1221  * Set state to be consistent with never trying abbreviation.
1222  *
1223  * Alter datum1 representation in already-copied tuples, so as to
1224  * ensure a consistent representation (current tuple was just
1225  * handled). It does not matter if some dumped tuples are already
1226  * sorted on tape, since serialized tuples lack abbreviated keys
1227  * (TSS_BUILDRUNS state prevents control reaching here in any case).
1228  */
1229  REMOVEABBREV(state, state->memtuples, state->memtupcount);
1230  }
1231 
1232  switch (state->status)
1233  {
1234  case TSS_INITIAL:
1235 
1236  /*
1237  * Save the tuple into the unsorted array. First, grow the array
1238  * as needed. Note that we try to grow the array when there is
1239  * still one free slot remaining --- if we fail, there'll still be
1240  * room to store the incoming tuple, and then we'll switch to
1241  * tape-based operation.
1242  */
1243  if (state->memtupcount >= state->memtupsize - 1)
1244  {
1245  (void) grow_memtuples(state);
1246  Assert(state->memtupcount < state->memtupsize);
1247  }
1248  state->memtuples[state->memtupcount++] = *tuple;
1249 
1250  /*
1251  * Check if it's time to switch over to a bounded heapsort. We do
1252  * so if the input tuple count exceeds twice the desired tuple
1253  * count (this is a heuristic for where heapsort becomes cheaper
1254  * than a quicksort), or if we've just filled workMem and have
1255  * enough tuples to meet the bound.
1256  *
1257  * Note that once we enter TSS_BOUNDED state we will always try to
1258  * complete the sort that way. In the worst case, if later input
1259  * tuples are larger than earlier ones, this might cause us to
1260  * exceed workMem significantly.
1261  */
1262  if (state->bounded &&
1263  (state->memtupcount > state->bound * 2 ||
1264  (state->memtupcount > state->bound && LACKMEM(state))))
1265  {
1266 #ifdef TRACE_SORT
1267  if (trace_sort)
1268  elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1269  state->memtupcount,
1270  pg_rusage_show(&state->ru_start));
1271 #endif
1273  MemoryContextSwitchTo(oldcontext);
1274  return;
1275  }
1276 
1277  /*
1278  * Done if we still fit in available memory and have array slots.
1279  */
1280  if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1281  {
1282  MemoryContextSwitchTo(oldcontext);
1283  return;
1284  }
1285 
1286  /*
1287  * Nope; time to switch to tape-based operation.
1288  */
1289  inittapes(state, true);
1290 
1291  /*
1292  * Dump all tuples.
1293  */
1294  dumptuples(state, false);
1295  break;
1296 
1297  case TSS_BOUNDED:
1298 
1299  /*
1300  * We don't want to grow the array here, so check whether the new
1301  * tuple can be discarded before putting it in. This should be a
1302  * good speed optimization, too, since when there are many more
1303  * input tuples than the bound, most input tuples can be discarded
1304  * with just this one comparison. Note that because we currently
1305  * have the sort direction reversed, we must check for <= not >=.
1306  */
1307  if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1308  {
1309  /* new tuple <= top of the heap, so we can discard it */
1310  free_sort_tuple(state, tuple);
1312  }
1313  else
1314  {
1315  /* discard top of heap, replacing it with the new tuple */
1316  free_sort_tuple(state, &state->memtuples[0]);
1318  }
1319  break;
1320 
1321  case TSS_BUILDRUNS:
1322 
1323  /*
1324  * Save the tuple into the unsorted array (there must be space)
1325  */
1326  state->memtuples[state->memtupcount++] = *tuple;
1327 
1328  /*
1329  * If we are over the memory limit, dump all tuples.
1330  */
1331  dumptuples(state, false);
1332  break;
1333 
1334  default:
1335  elog(ERROR, "invalid tuplesort state");
1336  break;
1337  }
1338  MemoryContextSwitchTo(oldcontext);
1339 }
1340 
1341 static bool
1343 {
1344  Assert(state->base.sortKeys[0].abbrev_converter != NULL);
1345  Assert(state->base.sortKeys[0].abbrev_abort != NULL);
1346  Assert(state->base.sortKeys[0].abbrev_full_comparator != NULL);
1347 
1348  /*
1349  * Check effectiveness of abbreviation optimization. Consider aborting
1350  * when still within memory limit.
1351  */
1352  if (state->status == TSS_INITIAL &&
1353  state->memtupcount >= state->abbrevNext)
1354  {
1355  state->abbrevNext *= 2;
1356 
1357  /*
1358  * Check opclass-supplied abbreviation abort routine. It may indicate
1359  * that abbreviation should not proceed.
1360  */
1361  if (!state->base.sortKeys->abbrev_abort(state->memtupcount,
1362  state->base.sortKeys))
1363  return false;
1364 
1365  /*
1366  * Finally, restore authoritative comparator, and indicate that
1367  * abbreviation is not in play by setting abbrev_converter to NULL
1368  */
1369  state->base.sortKeys[0].comparator = state->base.sortKeys[0].abbrev_full_comparator;
1370  state->base.sortKeys[0].abbrev_converter = NULL;
1371  /* Not strictly necessary, but be tidy */
1372  state->base.sortKeys[0].abbrev_abort = NULL;
1373  state->base.sortKeys[0].abbrev_full_comparator = NULL;
1374 
1375  /* Give up - expect original pass-by-value representation */
1376  return true;
1377  }
1378 
1379  return false;
1380 }
1381 
1382 /*
1383  * All tuples have been provided; finish the sort.
1384  */
1385 void
1387 {
1388  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1389 
1390 #ifdef TRACE_SORT
1391  if (trace_sort)
1392  elog(LOG, "performsort of worker %d starting: %s",
1393  state->worker, pg_rusage_show(&state->ru_start));
1394 #endif
1395 
1396  switch (state->status)
1397  {
1398  case TSS_INITIAL:
1399 
1400  /*
1401  * We were able to accumulate all the tuples within the allowed
1402  * amount of memory, or leader to take over worker tapes
1403  */
1404  if (SERIAL(state))
1405  {
1406  /* Just qsort 'em and we're done */
1408  state->status = TSS_SORTEDINMEM;
1409  }
1410  else if (WORKER(state))
1411  {
1412  /*
1413  * Parallel workers must still dump out tuples to tape. No
1414  * merge is required to produce single output run, though.
1415  */
1416  inittapes(state, false);
1417  dumptuples(state, true);
1419  state->status = TSS_SORTEDONTAPE;
1420  }
1421  else
1422  {
1423  /*
1424  * Leader will take over worker tapes and merge worker runs.
1425  * Note that mergeruns sets the correct state->status.
1426  */
1428  mergeruns(state);
1429  }
1430  state->current = 0;
1431  state->eof_reached = false;
1432  state->markpos_block = 0L;
1433  state->markpos_offset = 0;
1434  state->markpos_eof = false;
1435  break;
1436 
1437  case TSS_BOUNDED:
1438 
1439  /*
1440  * We were able to accumulate all the tuples required for output
1441  * in memory, using a heap to eliminate excess tuples. Now we
1442  * have to transform the heap to a properly-sorted array. Note
1443  * that sort_bounded_heap sets the correct state->status.
1444  */
1446  state->current = 0;
1447  state->eof_reached = false;
1448  state->markpos_offset = 0;
1449  state->markpos_eof = false;
1450  break;
1451 
1452  case TSS_BUILDRUNS:
1453 
1454  /*
1455  * Finish tape-based sort. First, flush all tuples remaining in
1456  * memory out to tape; then merge until we have a single remaining
1457  * run (or, if !randomAccess and !WORKER(), one run per tape).
1458  * Note that mergeruns sets the correct state->status.
1459  */
1460  dumptuples(state, true);
1461  mergeruns(state);
1462  state->eof_reached = false;
1463  state->markpos_block = 0L;
1464  state->markpos_offset = 0;
1465  state->markpos_eof = false;
1466  break;
1467 
1468  default:
1469  elog(ERROR, "invalid tuplesort state");
1470  break;
1471  }
1472 
1473 #ifdef TRACE_SORT
1474  if (trace_sort)
1475  {
1476  if (state->status == TSS_FINALMERGE)
1477  elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1478  state->worker, state->nInputTapes,
1479  pg_rusage_show(&state->ru_start));
1480  else
1481  elog(LOG, "performsort of worker %d done: %s",
1482  state->worker, pg_rusage_show(&state->ru_start));
1483  }
1484 #endif
1485 
1486  MemoryContextSwitchTo(oldcontext);
1487 }
1488 
1489 /*
1490  * Internal routine to fetch the next tuple in either forward or back
1491  * direction into *stup. Returns false if no more tuples.
1492  * Returned tuple belongs to tuplesort memory context, and must not be freed
1493  * by caller. Note that fetched tuple is stored in memory that may be
1494  * recycled by any future fetch.
1495  */
1496 bool
1498  SortTuple *stup)
1499 {
1500  unsigned int tuplen;
1501  size_t nmoved;
1502 
1503  Assert(!WORKER(state));
1504 
1505  switch (state->status)
1506  {
1507  case TSS_SORTEDINMEM:
1508  Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1509  Assert(!state->slabAllocatorUsed);
1510  if (forward)
1511  {
1512  if (state->current < state->memtupcount)
1513  {
1514  *stup = state->memtuples[state->current++];
1515  return true;
1516  }
1517  state->eof_reached = true;
1518 
1519  /*
1520  * Complain if caller tries to retrieve more tuples than
1521  * originally asked for in a bounded sort. This is because
1522  * returning EOF here might be the wrong thing.
1523  */
1524  if (state->bounded && state->current >= state->bound)
1525  elog(ERROR, "retrieved too many tuples in a bounded sort");
1526 
1527  return false;
1528  }
1529  else
1530  {
1531  if (state->current <= 0)
1532  return false;
1533 
1534  /*
1535  * if all tuples are fetched already then we return last
1536  * tuple, else - tuple before last returned.
1537  */
1538  if (state->eof_reached)
1539  state->eof_reached = false;
1540  else
1541  {
1542  state->current--; /* last returned tuple */
1543  if (state->current <= 0)
1544  return false;
1545  }
1546  *stup = state->memtuples[state->current - 1];
1547  return true;
1548  }
1549  break;
1550 
1551  case TSS_SORTEDONTAPE:
1552  Assert(forward || state->base.sortopt & TUPLESORT_RANDOMACCESS);
1553  Assert(state->slabAllocatorUsed);
1554 
1555  /*
1556  * The slot that held the tuple that we returned in previous
1557  * gettuple call can now be reused.
1558  */
1559  if (state->lastReturnedTuple)
1560  {
1561  RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1562  state->lastReturnedTuple = NULL;
1563  }
1564 
1565  if (forward)
1566  {
1567  if (state->eof_reached)
1568  return false;
1569 
1570  if ((tuplen = getlen(state->result_tape, true)) != 0)
1571  {
1572  READTUP(state, stup, state->result_tape, tuplen);
1573 
1574  /*
1575  * Remember the tuple we return, so that we can recycle
1576  * its memory on next call. (This can be NULL, in the
1577  * !state->tuples case).
1578  */
1579  state->lastReturnedTuple = stup->tuple;
1580 
1581  return true;
1582  }
1583  else
1584  {
1585  state->eof_reached = true;
1586  return false;
1587  }
1588  }
1589 
1590  /*
1591  * Backward.
1592  *
1593  * if all tuples are fetched already then we return last tuple,
1594  * else - tuple before last returned.
1595  */
1596  if (state->eof_reached)
1597  {
1598  /*
1599  * Seek position is pointing just past the zero tuplen at the
1600  * end of file; back up to fetch last tuple's ending length
1601  * word. If seek fails we must have a completely empty file.
1602  */
1603  nmoved = LogicalTapeBackspace(state->result_tape,
1604  2 * sizeof(unsigned int));
1605  if (nmoved == 0)
1606  return false;
1607  else if (nmoved != 2 * sizeof(unsigned int))
1608  elog(ERROR, "unexpected tape position");
1609  state->eof_reached = false;
1610  }
1611  else
1612  {
1613  /*
1614  * Back up and fetch previously-returned tuple's ending length
1615  * word. If seek fails, assume we are at start of file.
1616  */
1617  nmoved = LogicalTapeBackspace(state->result_tape,
1618  sizeof(unsigned int));
1619  if (nmoved == 0)
1620  return false;
1621  else if (nmoved != sizeof(unsigned int))
1622  elog(ERROR, "unexpected tape position");
1623  tuplen = getlen(state->result_tape, false);
1624 
1625  /*
1626  * Back up to get ending length word of tuple before it.
1627  */
1628  nmoved = LogicalTapeBackspace(state->result_tape,
1629  tuplen + 2 * sizeof(unsigned int));
1630  if (nmoved == tuplen + sizeof(unsigned int))
1631  {
1632  /*
1633  * We backed up over the previous tuple, but there was no
1634  * ending length word before it. That means that the prev
1635  * tuple is the first tuple in the file. It is now the
1636  * next to read in forward direction (not obviously right,
1637  * but that is what in-memory case does).
1638  */
1639  return false;
1640  }
1641  else if (nmoved != tuplen + 2 * sizeof(unsigned int))
1642  elog(ERROR, "bogus tuple length in backward scan");
1643  }
1644 
1645  tuplen = getlen(state->result_tape, false);
1646 
1647  /*
1648  * Now we have the length of the prior tuple, back up and read it.
1649  * Note: READTUP expects we are positioned after the initial
1650  * length word of the tuple, so back up to that point.
1651  */
1652  nmoved = LogicalTapeBackspace(state->result_tape,
1653  tuplen);
1654  if (nmoved != tuplen)
1655  elog(ERROR, "bogus tuple length in backward scan");
1656  READTUP(state, stup, state->result_tape, tuplen);
1657 
1658  /*
1659  * Remember the tuple we return, so that we can recycle its memory
1660  * on next call. (This can be NULL, in the Datum case).
1661  */
1662  state->lastReturnedTuple = stup->tuple;
1663 
1664  return true;
1665 
1666  case TSS_FINALMERGE:
1667  Assert(forward);
1668  /* We are managing memory ourselves, with the slab allocator. */
1669  Assert(state->slabAllocatorUsed);
1670 
1671  /*
1672  * The slab slot holding the tuple that we returned in previous
1673  * gettuple call can now be reused.
1674  */
1675  if (state->lastReturnedTuple)
1676  {
1677  RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1678  state->lastReturnedTuple = NULL;
1679  }
1680 
1681  /*
1682  * This code should match the inner loop of mergeonerun().
1683  */
1684  if (state->memtupcount > 0)
1685  {
1686  int srcTapeIndex = state->memtuples[0].srctape;
1687  LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
1688  SortTuple newtup;
1689 
1690  *stup = state->memtuples[0];
1691 
1692  /*
1693  * Remember the tuple we return, so that we can recycle its
1694  * memory on next call. (This can be NULL, in the Datum case).
1695  */
1696  state->lastReturnedTuple = stup->tuple;
1697 
1698  /*
1699  * Pull next tuple from tape, and replace the returned tuple
1700  * at top of the heap with it.
1701  */
1702  if (!mergereadnext(state, srcTape, &newtup))
1703  {
1704  /*
1705  * If no more data, we've reached end of run on this tape.
1706  * Remove the top node from the heap.
1707  */
1709  state->nInputRuns--;
1710 
1711  /*
1712  * Close the tape. It'd go away at the end of the sort
1713  * anyway, but better to release the memory early.
1714  */
1715  LogicalTapeClose(srcTape);
1716  return true;
1717  }
1718  newtup.srctape = srcTapeIndex;
1720  return true;
1721  }
1722  return false;
1723 
1724  default:
1725  elog(ERROR, "invalid tuplesort state");
1726  return false; /* keep compiler quiet */
1727  }
1728 }
1729 
1730 
1731 /*
1732  * Advance over N tuples in either forward or back direction,
1733  * without returning any data. N==0 is a no-op.
1734  * Returns true if successful, false if ran out of tuples.
1735  */
1736 bool
1737 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1738 {
1739  MemoryContext oldcontext;
1740 
1741  /*
1742  * We don't actually support backwards skip yet, because no callers need
1743  * it. The API is designed to allow for that later, though.
1744  */
1745  Assert(forward);
1746  Assert(ntuples >= 0);
1747  Assert(!WORKER(state));
1748 
1749  switch (state->status)
1750  {
1751  case TSS_SORTEDINMEM:
1752  if (state->memtupcount - state->current >= ntuples)
1753  {
1754  state->current += ntuples;
1755  return true;
1756  }
1757  state->current = state->memtupcount;
1758  state->eof_reached = true;
1759 
1760  /*
1761  * Complain if caller tries to retrieve more tuples than
1762  * originally asked for in a bounded sort. This is because
1763  * returning EOF here might be the wrong thing.
1764  */
1765  if (state->bounded && state->current >= state->bound)
1766  elog(ERROR, "retrieved too many tuples in a bounded sort");
1767 
1768  return false;
1769 
1770  case TSS_SORTEDONTAPE:
1771  case TSS_FINALMERGE:
1772 
1773  /*
1774  * We could probably optimize these cases better, but for now it's
1775  * not worth the trouble.
1776  */
1777  oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
1778  while (ntuples-- > 0)
1779  {
1780  SortTuple stup;
1781 
1782  if (!tuplesort_gettuple_common(state, forward, &stup))
1783  {
1784  MemoryContextSwitchTo(oldcontext);
1785  return false;
1786  }
1788  }
1789  MemoryContextSwitchTo(oldcontext);
1790  return true;
1791 
1792  default:
1793  elog(ERROR, "invalid tuplesort state");
1794  return false; /* keep compiler quiet */
1795  }
1796 }
1797 
1798 /*
1799  * tuplesort_merge_order - report merge order we'll use for given memory
1800  * (note: "merge order" just means the number of input tapes in the merge).
1801  *
1802  * This is exported for use by the planner. allowedMem is in bytes.
1803  */
1804 int
1805 tuplesort_merge_order(int64 allowedMem)
1806 {
1807  int mOrder;
1808 
1809  /*----------
1810  * In the merge phase, we need buffer space for each input and output tape.
1811  * Each pass in the balanced merge algorithm reads from M input tapes, and
1812  * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
1813  * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
1814  * input tape.
1815  *
1816  * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
1817  * N * TAPE_BUFFER_OVERHEAD
1818  *
1819  * Except for the last and next-to-last merge passes, where there can be
1820  * fewer tapes left to process, M = N. We choose M so that we have the
1821  * desired amount of memory available for the input buffers
1822  * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
1823  * available for the tape buffers (allowedMem).
1824  *
1825  * Note: you might be thinking we need to account for the memtuples[]
1826  * array in this calculation, but we effectively treat that as part of the
1827  * MERGE_BUFFER_SIZE workspace.
1828  *----------
1829  */
1830  mOrder = allowedMem /
1832 
1833  /*
1834  * Even in minimum memory, use at least a MINORDER merge. On the other
1835  * hand, even when we have lots of memory, do not use more than a MAXORDER
1836  * merge. Tapes are pretty cheap, but they're not entirely free. Each
1837  * additional tape reduces the amount of memory available to build runs,
1838  * which in turn can cause the same sort to need more runs, which makes
1839  * merging slower even if it can still be done in a single pass. Also,
1840  * high order merges are quite slow due to CPU cache effects; it can be
1841  * faster to pay the I/O cost of a multi-pass merge than to perform a
1842  * single merge pass across many hundreds of tapes.
1843  */
1844  mOrder = Max(mOrder, MINORDER);
1845  mOrder = Min(mOrder, MAXORDER);
1846 
1847  return mOrder;
1848 }
1849 
1850 /*
1851  * Helper function to calculate how much memory to allocate for the read buffer
1852  * of each input tape in a merge pass.
1853  *
1854  * 'avail_mem' is the amount of memory available for the buffers of all the
1855  * tapes, both input and output.
1856  * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
1857  * 'maxOutputTapes' is the max. number of output tapes we should produce.
1858  */
1859 static int64
1860 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
1861  int maxOutputTapes)
1862 {
1863  int nOutputRuns;
1864  int nOutputTapes;
1865 
1866  /*
1867  * How many output tapes will we produce in this pass?
1868  *
1869  * This is nInputRuns / nInputTapes, rounded up.
1870  */
1871  nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
1872 
1873  nOutputTapes = Min(nOutputRuns, maxOutputTapes);
1874 
1875  /*
1876  * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
1877  * remaining memory is divided evenly between the input tapes.
1878  *
1879  * This also follows from the formula in tuplesort_merge_order, but here
1880  * we derive the input buffer size from the amount of memory available,
1881  * and M and N.
1882  */
1883  return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
1884 }
1885 
1886 /*
1887  * inittapes - initialize for tape sorting.
1888  *
1889  * This is called only if we have found we won't sort in memory.
1890  */
1891 static void
1893 {
1894  Assert(!LEADER(state));
1895 
1896  if (mergeruns)
1897  {
1898  /* Compute number of input tapes to use when merging */
1899  state->maxTapes = tuplesort_merge_order(state->allowedMem);
1900  }
1901  else
1902  {
1903  /* Workers can sometimes produce single run, output without merge */
1904  Assert(WORKER(state));
1905  state->maxTapes = MINORDER;
1906  }
1907 
1908 #ifdef TRACE_SORT
1909  if (trace_sort)
1910  elog(LOG, "worker %d switching to external sort with %d tapes: %s",
1911  state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
1912 #endif
1913 
1914  /* Create the tape set */
1915  inittapestate(state, state->maxTapes);
1916  state->tapeset =
1917  LogicalTapeSetCreate(false,
1918  state->shared ? &state->shared->fileset : NULL,
1919  state->worker);
1920 
1921  state->currentRun = 0;
1922 
1923  /*
1924  * Initialize logical tape arrays.
1925  */
1926  state->inputTapes = NULL;
1927  state->nInputTapes = 0;
1928  state->nInputRuns = 0;
1929 
1930  state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
1931  state->nOutputTapes = 0;
1932  state->nOutputRuns = 0;
1933 
1934  state->status = TSS_BUILDRUNS;
1935 
1937 }
1938 
1939 /*
1940  * inittapestate - initialize generic tape management state
1941  */
1942 static void
1944 {
1945  int64 tapeSpace;
1946 
1947  /*
1948  * Decrease availMem to reflect the space needed for tape buffers; but
1949  * don't decrease it to the point that we have no room for tuples. (That
1950  * case is only likely to occur if sorting pass-by-value Datums; in all
1951  * other scenarios the memtuples[] array is unlikely to occupy more than
1952  * half of allowedMem. In the pass-by-value case it's not important to
1953  * account for tuple space, so we don't care if LACKMEM becomes
1954  * inaccurate.)
1955  */
1956  tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
1957 
1958  if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1959  USEMEM(state, tapeSpace);
1960 
1961  /*
1962  * Make sure that the temp file(s) underlying the tape set are created in
1963  * suitable temp tablespaces. For parallel sorts, this should have been
1964  * called already, but it doesn't matter if it is called a second time.
1965  */
1967 }
1968 
1969 /*
1970  * selectnewtape -- select next tape to output to.
1971  *
1972  * This is called after finishing a run when we know another run
1973  * must be started. This is used both when building the initial
1974  * runs, and during merge passes.
1975  */
1976 static void
1978 {
1979  /*
1980  * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
1981  * both zero. On each call, we create a new output tape to hold the next
1982  * run, until maxTapes is reached. After that, we assign new runs to the
1983  * existing tapes in a round robin fashion.
1984  */
1985  if (state->nOutputTapes < state->maxTapes)
1986  {
1987  /* Create a new tape to hold the next run */
1988  Assert(state->outputTapes[state->nOutputRuns] == NULL);
1989  Assert(state->nOutputRuns == state->nOutputTapes);
1990  state->destTape = LogicalTapeCreate(state->tapeset);
1991  state->outputTapes[state->nOutputTapes] = state->destTape;
1992  state->nOutputTapes++;
1993  state->nOutputRuns++;
1994  }
1995  else
1996  {
1997  /*
1998  * We have reached the max number of tapes. Append to an existing
1999  * tape.
2000  */
2001  state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
2002  state->nOutputRuns++;
2003  }
2004 }
2005 
2006 /*
2007  * Initialize the slab allocation arena, for the given number of slots.
2008  */
2009 static void
2011 {
2012  if (numSlots > 0)
2013  {
2014  char *p;
2015  int i;
2016 
2017  state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2018  state->slabMemoryEnd = state->slabMemoryBegin +
2019  numSlots * SLAB_SLOT_SIZE;
2020  state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2021  USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2022 
2023  p = state->slabMemoryBegin;
2024  for (i = 0; i < numSlots - 1; i++)
2025  {
2026  ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2027  p += SLAB_SLOT_SIZE;
2028  }
2029  ((SlabSlot *) p)->nextfree = NULL;
2030  }
2031  else
2032  {
2033  state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2034  state->slabFreeHead = NULL;
2035  }
2036  state->slabAllocatorUsed = true;
2037 }
2038 
2039 /*
2040  * mergeruns -- merge all the completed initial runs.
2041  *
2042  * This implements the Balanced k-Way Merge Algorithm. All input data has
2043  * already been written to initial runs on tape (see dumptuples).
2044  */
2045 static void
2047 {
2048  int tapenum;
2049 
2050  Assert(state->status == TSS_BUILDRUNS);
2051  Assert(state->memtupcount == 0);
2052 
2053  if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL)
2054  {
2055  /*
2056  * If there are multiple runs to be merged, when we go to read back
2057  * tuples from disk, abbreviated keys will not have been stored, and
2058  * we don't care to regenerate them. Disable abbreviation from this
2059  * point on.
2060  */
2061  state->base.sortKeys->abbrev_converter = NULL;
2062  state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator;
2063 
2064  /* Not strictly necessary, but be tidy */
2065  state->base.sortKeys->abbrev_abort = NULL;
2066  state->base.sortKeys->abbrev_full_comparator = NULL;
2067  }
2068 
2069  /*
2070  * Reset tuple memory. We've freed all the tuples that we previously
2071  * allocated. We will use the slab allocator from now on.
2072  */
2073  MemoryContextResetOnly(state->base.tuplecontext);
2074 
2075  /*
2076  * We no longer need a large memtuples array. (We will allocate a smaller
2077  * one for the heap later.)
2078  */
2079  FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2080  pfree(state->memtuples);
2081  state->memtuples = NULL;
2082 
2083  /*
2084  * Initialize the slab allocator. We need one slab slot per input tape,
2085  * for the tuples in the heap, plus one to hold the tuple last returned
2086  * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
2087  * however, we don't need to do allocate anything.)
2088  *
2089  * In a multi-pass merge, we could shrink this allocation for the last
2090  * merge pass, if it has fewer tapes than previous passes, but we don't
2091  * bother.
2092  *
2093  * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2094  * to track memory usage of individual tuples.
2095  */
2096  if (state->base.tuples)
2097  init_slab_allocator(state, state->nOutputTapes + 1);
2098  else
2100 
2101  /*
2102  * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
2103  * from each input tape.
2104  *
2105  * We could shrink this, too, between passes in a multi-pass merge, but we
2106  * don't bother. (The initial input tapes are still in outputTapes. The
2107  * number of input tapes will not increase between passes.)
2108  */
2109  state->memtupsize = state->nOutputTapes;
2110  state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,
2111  state->nOutputTapes * sizeof(SortTuple));
2112  USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2113 
2114  /*
2115  * Use all the remaining memory we have available for tape buffers among
2116  * all the input tapes. At the beginning of each merge pass, we will
2117  * divide this memory between the input and output tapes in the pass.
2118  */
2119  state->tape_buffer_mem = state->availMem;
2120  USEMEM(state, state->tape_buffer_mem);
2121 #ifdef TRACE_SORT
2122  if (trace_sort)
2123  elog(LOG, "worker %d using %zu KB of memory for tape buffers",
2124  state->worker, state->tape_buffer_mem / 1024);
2125 #endif
2126 
2127  for (;;)
2128  {
2129  /*
2130  * On the first iteration, or if we have read all the runs from the
2131  * input tapes in a multi-pass merge, it's time to start a new pass.
2132  * Rewind all the output tapes, and make them inputs for the next
2133  * pass.
2134  */
2135  if (state->nInputRuns == 0)
2136  {
2137  int64 input_buffer_size;
2138 
2139  /* Close the old, emptied, input tapes */
2140  if (state->nInputTapes > 0)
2141  {
2142  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2143  LogicalTapeClose(state->inputTapes[tapenum]);
2144  pfree(state->inputTapes);
2145  }
2146 
2147  /* Previous pass's outputs become next pass's inputs. */
2148  state->inputTapes = state->outputTapes;
2149  state->nInputTapes = state->nOutputTapes;
2150  state->nInputRuns = state->nOutputRuns;
2151 
2152  /*
2153  * Reset output tape variables. The actual LogicalTapes will be
2154  * created as needed, here we only allocate the array to hold
2155  * them.
2156  */
2157  state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
2158  state->nOutputTapes = 0;
2159  state->nOutputRuns = 0;
2160 
2161  /*
2162  * Redistribute the memory allocated for tape buffers, among the
2163  * new input and output tapes.
2164  */
2165  input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
2166  state->nInputTapes,
2167  state->nInputRuns,
2168  state->maxTapes);
2169 
2170 #ifdef TRACE_SORT
2171  if (trace_sort)
2172  elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
2173  state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
2174  pg_rusage_show(&state->ru_start));
2175 #endif
2176 
2177  /* Prepare the new input tapes for merge pass. */
2178  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2179  LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
2180 
2181  /*
2182  * If there's just one run left on each input tape, then only one
2183  * merge pass remains. If we don't have to produce a materialized
2184  * sorted tape, we can stop at this point and do the final merge
2185  * on-the-fly.
2186  */
2187  if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0
2188  && state->nInputRuns <= state->nInputTapes
2189  && !WORKER(state))
2190  {
2191  /* Tell logtape.c we won't be writing anymore */
2193  /* Initialize for the final merge pass */
2194  beginmerge(state);
2195  state->status = TSS_FINALMERGE;
2196  return;
2197  }
2198  }
2199 
2200  /* Select an output tape */
2202 
2203  /* Merge one run from each input tape. */
2204  mergeonerun(state);
2205 
2206  /*
2207  * If the input tapes are empty, and we output only one output run,
2208  * we're done. The current output tape contains the final result.
2209  */
2210  if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
2211  break;
2212  }
2213 
2214  /*
2215  * Done. The result is on a single run on a single tape.
2216  */
2217  state->result_tape = state->outputTapes[0];
2218  if (!WORKER(state))
2219  LogicalTapeFreeze(state->result_tape, NULL);
2220  else
2222  state->status = TSS_SORTEDONTAPE;
2223 
2224  /* Close all the now-empty input tapes, to release their read buffers. */
2225  for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
2226  LogicalTapeClose(state->inputTapes[tapenum]);
2227 }
2228 
2229 /*
2230  * Merge one run from each input tape.
2231  */
2232 static void
2234 {
2235  int srcTapeIndex;
2236  LogicalTape *srcTape;
2237 
2238  /*
2239  * Start the merge by loading one tuple from each active source tape into
2240  * the heap.
2241  */
2242  beginmerge(state);
2243 
2244  Assert(state->slabAllocatorUsed);
2245 
2246  /*
2247  * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2248  * out, and replacing it with next tuple from same tape (if there is
2249  * another one).
2250  */
2251  while (state->memtupcount > 0)
2252  {
2253  SortTuple stup;
2254 
2255  /* write the tuple to destTape */
2256  srcTapeIndex = state->memtuples[0].srctape;
2257  srcTape = state->inputTapes[srcTapeIndex];
2258  WRITETUP(state, state->destTape, &state->memtuples[0]);
2259 
2260  /* recycle the slot of the tuple we just wrote out, for the next read */
2261  if (state->memtuples[0].tuple)
2262  RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2263 
2264  /*
2265  * pull next tuple from the tape, and replace the written-out tuple in
2266  * the heap with it.
2267  */
2268  if (mergereadnext(state, srcTape, &stup))
2269  {
2270  stup.srctape = srcTapeIndex;
2272  }
2273  else
2274  {
2276  state->nInputRuns--;
2277  }
2278  }
2279 
2280  /*
2281  * When the heap empties, we're done. Write an end-of-run marker on the
2282  * output tape.
2283  */
2284  markrunend(state->destTape);
2285 }
2286 
2287 /*
2288  * beginmerge - initialize for a merge pass
2289  *
2290  * Fill the merge heap with the first tuple from each input tape.
2291  */
2292 static void
2294 {
2295  int activeTapes;
2296  int srcTapeIndex;
2297 
2298  /* Heap should be empty here */
2299  Assert(state->memtupcount == 0);
2300 
2301  activeTapes = Min(state->nInputTapes, state->nInputRuns);
2302 
2303  for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
2304  {
2305  SortTuple tup;
2306 
2307  if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
2308  {
2309  tup.srctape = srcTapeIndex;
2311  }
2312  }
2313 }
2314 
2315 /*
2316  * mergereadnext - read next tuple from one merge input tape
2317  *
2318  * Returns false on EOF.
2319  */
2320 static bool
2322 {
2323  unsigned int tuplen;
2324 
2325  /* read next tuple, if any */
2326  if ((tuplen = getlen(srcTape, true)) == 0)
2327  return false;
2328  READTUP(state, stup, srcTape, tuplen);
2329 
2330  return true;
2331 }
2332 
2333 /*
2334  * dumptuples - remove tuples from memtuples and write initial run to tape
2335  *
2336  * When alltuples = true, dump everything currently in memory. (This case is
2337  * only used at end of input data.)
2338  */
2339 static void
2341 {
2342  int memtupwrite;
2343  int i;
2344 
2345  /*
2346  * Nothing to do if we still fit in available memory and have array slots,
2347  * unless this is the final call during initial run generation.
2348  */
2349  if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2350  !alltuples)
2351  return;
2352 
2353  /*
2354  * Final call might require no sorting, in rare cases where we just so
2355  * happen to have previously LACKMEM()'d at the point where exactly all
2356  * remaining tuples are loaded into memory, just before input was
2357  * exhausted. In general, short final runs are quite possible, but avoid
2358  * creating a completely empty run. In a worker, though, we must produce
2359  * at least one tape, even if it's empty.
2360  */
2361  if (state->memtupcount == 0 && state->currentRun > 0)
2362  return;
2363 
2364  Assert(state->status == TSS_BUILDRUNS);
2365 
2366  /*
2367  * It seems unlikely that this limit will ever be exceeded, but take no
2368  * chances
2369  */
2370  if (state->currentRun == INT_MAX)
2371  ereport(ERROR,
2372  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2373  errmsg("cannot have more than %d runs for an external sort",
2374  INT_MAX)));
2375 
2376  if (state->currentRun > 0)
2378 
2379  state->currentRun++;
2380 
2381 #ifdef TRACE_SORT
2382  if (trace_sort)
2383  elog(LOG, "worker %d starting quicksort of run %d: %s",
2384  state->worker, state->currentRun,
2385  pg_rusage_show(&state->ru_start));
2386 #endif
2387 
2388  /*
2389  * Sort all tuples accumulated within the allowed amount of memory for
2390  * this run using quicksort
2391  */
2393 
2394 #ifdef TRACE_SORT
2395  if (trace_sort)
2396  elog(LOG, "worker %d finished quicksort of run %d: %s",
2397  state->worker, state->currentRun,
2398  pg_rusage_show(&state->ru_start));
2399 #endif
2400 
2401  memtupwrite = state->memtupcount;
2402  for (i = 0; i < memtupwrite; i++)
2403  {
2404  SortTuple *stup = &state->memtuples[i];
2405 
2406  WRITETUP(state, state->destTape, stup);
2407  }
2408 
2409  state->memtupcount = 0;
2410 
2411  /*
2412  * Reset tuple memory. We've freed all of the tuples that we previously
2413  * allocated. It's important to avoid fragmentation when there is a stark
2414  * change in the sizes of incoming tuples. In bounded sorts,
2415  * fragmentation due to AllocSetFree's bucketing by size class might be
2416  * particularly bad if this step wasn't taken.
2417  */
2418  MemoryContextReset(state->base.tuplecontext);
2419 
2420  /*
2421  * Now update the memory accounting to subtract the memory used by the
2422  * tuple.
2423  */
2424  FREEMEM(state, state->tupleMem);
2425  state->tupleMem = 0;
2426 
2427  markrunend(state->destTape);
2428 
2429 #ifdef TRACE_SORT
2430  if (trace_sort)
2431  elog(LOG, "worker %d finished writing run %d to tape %d: %s",
2432  state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
2433  pg_rusage_show(&state->ru_start));
2434 #endif
2435 }
2436 
2437 /*
2438  * tuplesort_rescan - rewind and replay the scan
2439  */
2440 void
2442 {
2443  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2444 
2445  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2446 
2447  switch (state->status)
2448  {
2449  case TSS_SORTEDINMEM:
2450  state->current = 0;
2451  state->eof_reached = false;
2452  state->markpos_offset = 0;
2453  state->markpos_eof = false;
2454  break;
2455  case TSS_SORTEDONTAPE:
2456  LogicalTapeRewindForRead(state->result_tape, 0);
2457  state->eof_reached = false;
2458  state->markpos_block = 0L;
2459  state->markpos_offset = 0;
2460  state->markpos_eof = false;
2461  break;
2462  default:
2463  elog(ERROR, "invalid tuplesort state");
2464  break;
2465  }
2466 
2467  MemoryContextSwitchTo(oldcontext);
2468 }
2469 
2470 /*
2471  * tuplesort_markpos - saves current position in the merged sort file
2472  */
2473 void
2475 {
2476  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2477 
2478  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2479 
2480  switch (state->status)
2481  {
2482  case TSS_SORTEDINMEM:
2483  state->markpos_offset = state->current;
2484  state->markpos_eof = state->eof_reached;
2485  break;
2486  case TSS_SORTEDONTAPE:
2487  LogicalTapeTell(state->result_tape,
2488  &state->markpos_block,
2489  &state->markpos_offset);
2490  state->markpos_eof = state->eof_reached;
2491  break;
2492  default:
2493  elog(ERROR, "invalid tuplesort state");
2494  break;
2495  }
2496 
2497  MemoryContextSwitchTo(oldcontext);
2498 }
2499 
2500 /*
2501  * tuplesort_restorepos - restores current position in merged sort file to
2502  * last saved position
2503  */
2504 void
2506 {
2507  MemoryContext oldcontext = MemoryContextSwitchTo(state->base.sortcontext);
2508 
2509  Assert(state->base.sortopt & TUPLESORT_RANDOMACCESS);
2510 
2511  switch (state->status)
2512  {
2513  case TSS_SORTEDINMEM:
2514  state->current = state->markpos_offset;
2515  state->eof_reached = state->markpos_eof;
2516  break;
2517  case TSS_SORTEDONTAPE:
2518  LogicalTapeSeek(state->result_tape,
2519  state->markpos_block,
2520  state->markpos_offset);
2521  state->eof_reached = state->markpos_eof;
2522  break;
2523  default:
2524  elog(ERROR, "invalid tuplesort state");
2525  break;
2526  }
2527 
2528  MemoryContextSwitchTo(oldcontext);
2529 }
2530 
2531 /*
2532  * tuplesort_get_stats - extract summary statistics
2533  *
2534  * This can be called after tuplesort_performsort() finishes to obtain
2535  * printable summary information about how the sort was performed.
2536  */
2537 void
2539  TuplesortInstrumentation *stats)
2540 {
2541  /*
2542  * Note: it might seem we should provide both memory and disk usage for a
2543  * disk-based sort. However, the current code doesn't track memory space
2544  * accurately once we have begun to return tuples to the caller (since we
2545  * don't account for pfree's the caller is expected to do), so we cannot
2546  * rely on availMem in a disk sort. This does not seem worth the overhead
2547  * to fix. Is it worth creating an API for the memory context code to
2548  * tell us how much is actually used in sortcontext?
2549  */
2551 
2552  if (state->isMaxSpaceDisk)
2554  else
2556  stats->spaceUsed = (state->maxSpace + 1023) / 1024;
2557 
2558  switch (state->maxSpaceStatus)
2559  {
2560  case TSS_SORTEDINMEM:
2561  if (state->boundUsed)
2563  else
2565  break;
2566  case TSS_SORTEDONTAPE:
2568  break;
2569  case TSS_FINALMERGE:
2571  break;
2572  default:
2574  break;
2575  }
2576 }
2577 
2578 /*
2579  * Convert TuplesortMethod to a string.
2580  */
2581 const char *
2583 {
2584  switch (m)
2585  {
2587  return "still in progress";
2589  return "top-N heapsort";
2590  case SORT_TYPE_QUICKSORT:
2591  return "quicksort";
2593  return "external sort";
2595  return "external merge";
2596  }
2597 
2598  return "unknown";
2599 }
2600 
2601 /*
2602  * Convert TuplesortSpaceType to a string.
2603  */
2604 const char *
2606 {
2608  return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
2609 }
2610 
2611 
2612 /*
2613  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2614  */
2615 
2616 /*
2617  * Convert the existing unordered array of SortTuples to a bounded heap,
2618  * discarding all but the smallest "state->bound" tuples.
2619  *
2620  * When working with a bounded heap, we want to keep the largest entry
2621  * at the root (array entry zero), instead of the smallest as in the normal
2622  * sort case. This allows us to discard the largest entry cheaply.
2623  * Therefore, we temporarily reverse the sort direction.
2624  */
2625 static void
2627 {
2628  int tupcount = state->memtupcount;
2629  int i;
2630 
2631  Assert(state->status == TSS_INITIAL);
2632  Assert(state->bounded);
2633  Assert(tupcount >= state->bound);
2634  Assert(SERIAL(state));
2635 
2636  /* Reverse sort direction so largest entry will be at root */
2638 
2639  state->memtupcount = 0; /* make the heap empty */
2640  for (i = 0; i < tupcount; i++)
2641  {
2642  if (state->memtupcount < state->bound)
2643  {
2644  /* Insert next tuple into heap */
2645  /* Must copy source tuple to avoid possible overwrite */
2646  SortTuple stup = state->memtuples[i];
2647 
2648  tuplesort_heap_insert(state, &stup);
2649  }
2650  else
2651  {
2652  /*
2653  * The heap is full. Replace the largest entry with the new
2654  * tuple, or just discard it, if it's larger than anything already
2655  * in the heap.
2656  */
2657  if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2658  {
2659  free_sort_tuple(state, &state->memtuples[i]);
2661  }
2662  else
2663  tuplesort_heap_replace_top(state, &state->memtuples[i]);
2664  }
2665  }
2666 
2667  Assert(state->memtupcount == state->bound);
2668  state->status = TSS_BOUNDED;
2669 }
2670 
2671 /*
2672  * Convert the bounded heap to a properly-sorted array
2673  */
2674 static void
2676 {
2677  int tupcount = state->memtupcount;
2678 
2679  Assert(state->status == TSS_BOUNDED);
2680  Assert(state->bounded);
2681  Assert(tupcount == state->bound);
2682  Assert(SERIAL(state));
2683 
2684  /*
2685  * We can unheapify in place because each delete-top call will remove the
2686  * largest entry, which we can promptly store in the newly freed slot at
2687  * the end. Once we're down to a single-entry heap, we're done.
2688  */
2689  while (state->memtupcount > 1)
2690  {
2691  SortTuple stup = state->memtuples[0];
2692 
2693  /* this sifts-up the next-largest entry and decreases memtupcount */
2695  state->memtuples[state->memtupcount] = stup;
2696  }
2697  state->memtupcount = tupcount;
2698 
2699  /*
2700  * Reverse sort direction back to the original state. This is not
2701  * actually necessary but seems like a good idea for tidiness.
2702  */
2704 
2705  state->status = TSS_SORTEDINMEM;
2706  state->boundUsed = true;
2707 }
2708 
2709 /*
2710  * Sort all memtuples using specialized qsort() routines.
2711  *
2712  * Quicksort is used for small in-memory sorts, and external sort runs.
2713  */
2714 static void
2716 {
2717  Assert(!LEADER(state));
2718 
2719  if (state->memtupcount > 1)
2720  {
2721  /*
2722  * Do we have the leading column's value or abbreviation in datum1,
2723  * and is there a specialization for its comparator?
2724  */
2725  if (state->base.haveDatum1 && state->base.sortKeys)
2726  {
2727  if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp)
2728  {
2729  qsort_tuple_unsigned(state->memtuples,
2730  state->memtupcount,
2731  state);
2732  return;
2733  }
2734 #if SIZEOF_DATUM >= 8
2735  else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp)
2736  {
2737  qsort_tuple_signed(state->memtuples,
2738  state->memtupcount,
2739  state);
2740  return;
2741  }
2742 #endif
2743  else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp)
2744  {
2745  qsort_tuple_int32(state->memtuples,
2746  state->memtupcount,
2747  state);
2748  return;
2749  }
2750  }
2751 
2752  /* Can we use the single-key sort function? */
2753  if (state->base.onlyKey != NULL)
2754  {
2755  qsort_ssup(state->memtuples, state->memtupcount,
2756  state->base.onlyKey);
2757  }
2758  else
2759  {
2760  qsort_tuple(state->memtuples,
2761  state->memtupcount,
2762  state->base.comparetup,
2763  state);
2764  }
2765  }
2766 }
2767 
2768 /*
2769  * Insert a new tuple into an empty or existing heap, maintaining the
2770  * heap invariant. Caller is responsible for ensuring there's room.
2771  *
2772  * Note: For some callers, tuple points to a memtuples[] entry above the
2773  * end of the heap. This is safe as long as it's not immediately adjacent
2774  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2775  * is, it might get overwritten before being moved into the heap!
2776  */
2777 static void
2779 {
2780  SortTuple *memtuples;
2781  int j;
2782 
2783  memtuples = state->memtuples;
2784  Assert(state->memtupcount < state->memtupsize);
2785 
2787 
2788  /*
2789  * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2790  * using 1-based array indexes, not 0-based.
2791  */
2792  j = state->memtupcount++;
2793  while (j > 0)
2794  {
2795  int i = (j - 1) >> 1;
2796 
2797  if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
2798  break;
2799  memtuples[j] = memtuples[i];
2800  j = i;
2801  }
2802  memtuples[j] = *tuple;
2803 }
2804 
2805 /*
2806  * Remove the tuple at state->memtuples[0] from the heap. Decrement
2807  * memtupcount, and sift up to maintain the heap invariant.
2808  *
2809  * The caller has already free'd the tuple the top node points to,
2810  * if necessary.
2811  */
2812 static void
2814 {
2815  SortTuple *memtuples = state->memtuples;
2816  SortTuple *tuple;
2817 
2818  if (--state->memtupcount <= 0)
2819  return;
2820 
2821  /*
2822  * Remove the last tuple in the heap, and re-insert it, by replacing the
2823  * current top node with it.
2824  */
2825  tuple = &memtuples[state->memtupcount];
2827 }
2828 
2829 /*
2830  * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
2831  * maintain the heap invariant.
2832  *
2833  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
2834  * Heapsort, steps H3-H8).
2835  */
2836 static void
2838 {
2839  SortTuple *memtuples = state->memtuples;
2840  unsigned int i,
2841  n;
2842 
2843  Assert(state->memtupcount >= 1);
2844 
2846 
2847  /*
2848  * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
2849  * This prevents overflow in the "2 * i + 1" calculation, since at the top
2850  * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
2851  */
2852  n = state->memtupcount;
2853  i = 0; /* i is where the "hole" is */
2854  for (;;)
2855  {
2856  unsigned int j = 2 * i + 1;
2857 
2858  if (j >= n)
2859  break;
2860  if (j + 1 < n &&
2861  COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
2862  j++;
2863  if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
2864  break;
2865  memtuples[i] = memtuples[j];
2866  i = j;
2867  }
2868  memtuples[i] = *tuple;
2869 }
2870 
2871 /*
2872  * Function to reverse the sort direction from its current state
2873  *
2874  * It is not safe to call this when performing hash tuplesorts
2875  */
2876 static void
2878 {
2879  SortSupport sortKey = state->base.sortKeys;
2880  int nkey;
2881 
2882  for (nkey = 0; nkey < state->base.nKeys; nkey++, sortKey++)
2883  {
2884  sortKey->ssup_reverse = !sortKey->ssup_reverse;
2885  sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2886  }
2887 }
2888 
2889 
2890 /*
2891  * Tape interface routines
2892  */
2893 
2894 static unsigned int
2895 getlen(LogicalTape *tape, bool eofOK)
2896 {
2897  unsigned int len;
2898 
2899  if (LogicalTapeRead(tape,
2900  &len, sizeof(len)) != sizeof(len))
2901  elog(ERROR, "unexpected end of tape");
2902  if (len == 0 && !eofOK)
2903  elog(ERROR, "unexpected end of data");
2904  return len;
2905 }
2906 
2907 static void
2909 {
2910  unsigned int len = 0;
2911 
2912  LogicalTapeWrite(tape, &len, sizeof(len));
2913 }
2914 
2915 /*
2916  * Get memory for tuple from within READTUP() routine.
2917  *
2918  * We use next free slot from the slab allocator, or palloc() if the tuple
2919  * is too large for that.
2920  */
2921 void *
2923 {
2924  SlabSlot *buf;
2925 
2926  /*
2927  * We pre-allocate enough slots in the slab arena that we should never run
2928  * out.
2929  */
2930  Assert(state->slabFreeHead);
2931 
2932  if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
2933  return MemoryContextAlloc(state->base.sortcontext, tuplen);
2934  else
2935  {
2936  buf = state->slabFreeHead;
2937  /* Reuse this slot */
2938  state->slabFreeHead = buf->nextfree;
2939 
2940  return buf;
2941  }
2942 }
2943 
2944 
2945 /*
2946  * Parallel sort routines
2947  */
2948 
2949 /*
2950  * tuplesort_estimate_shared - estimate required shared memory allocation
2951  *
2952  * nWorkers is an estimate of the number of workers (it's the number that
2953  * will be requested).
2954  */
2955 Size
2957 {
2958  Size tapesSize;
2959 
2960  Assert(nWorkers > 0);
2961 
2962  /* Make sure that BufFile shared state is MAXALIGN'd */
2963  tapesSize = mul_size(sizeof(TapeShare), nWorkers);
2964  tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
2965 
2966  return tapesSize;
2967 }
2968 
2969 /*
2970  * tuplesort_initialize_shared - initialize shared tuplesort state
2971  *
2972  * Must be called from leader process before workers are launched, to
2973  * establish state needed up-front for worker tuplesortstates. nWorkers
2974  * should match the argument passed to tuplesort_estimate_shared().
2975  */
2976 void
2978 {
2979  int i;
2980 
2981  Assert(nWorkers > 0);
2982 
2983  SpinLockInit(&shared->mutex);
2984  shared->currentWorker = 0;
2985  shared->workersFinished = 0;
2986  SharedFileSetInit(&shared->fileset, seg);
2987  shared->nTapes = nWorkers;
2988  for (i = 0; i < nWorkers; i++)
2989  {
2990  shared->tapes[i].firstblocknumber = 0L;
2991  }
2992 }
2993 
2994 /*
2995  * tuplesort_attach_shared - attach to shared tuplesort state
2996  *
2997  * Must be called by all worker processes.
2998  */
2999 void
3001 {
3002  /* Attach to SharedFileSet */
3003  SharedFileSetAttach(&shared->fileset, seg);
3004 }
3005 
3006 /*
3007  * worker_get_identifier - Assign and return ordinal identifier for worker
3008  *
3009  * The order in which these are assigned is not well defined, and should not
3010  * matter; worker numbers across parallel sort participants need only be
3011  * distinct and gapless. logtape.c requires this.
3012  *
3013  * Note that the identifiers assigned from here have no relation to
3014  * ParallelWorkerNumber number, to avoid making any assumption about
3015  * caller's requirements. However, we do follow the ParallelWorkerNumber
3016  * convention of representing a non-worker with worker number -1. This
3017  * includes the leader, as well as serial Tuplesort processes.
3018  */
3019 static int
3021 {
3022  Sharedsort *shared = state->shared;
3023  int worker;
3024 
3025  Assert(WORKER(state));
3026 
3027  SpinLockAcquire(&shared->mutex);
3028  worker = shared->currentWorker++;
3029  SpinLockRelease(&shared->mutex);
3030 
3031  return worker;
3032 }
3033 
3034 /*
3035  * worker_freeze_result_tape - freeze worker's result tape for leader
3036  *
3037  * This is called by workers just after the result tape has been determined,
3038  * instead of calling LogicalTapeFreeze() directly. They do so because
3039  * workers require a few additional steps over similar serial
3040  * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
3041  * steps are around freeing now unneeded resources, and representing to
3042  * leader that worker's input run is available for its merge.
3043  *
3044  * There should only be one final output run for each worker, which consists
3045  * of all tuples that were originally input into worker.
3046  */
3047 static void
3049 {
3050  Sharedsort *shared = state->shared;
3051  TapeShare output;
3052 
3053  Assert(WORKER(state));
3054  Assert(state->result_tape != NULL);
3055  Assert(state->memtupcount == 0);
3056 
3057  /*
3058  * Free most remaining memory, in case caller is sensitive to our holding
3059  * on to it. memtuples may not be a tiny merge heap at this point.
3060  */
3061  pfree(state->memtuples);
3062  /* Be tidy */
3063  state->memtuples = NULL;
3064  state->memtupsize = 0;
3065 
3066  /*
3067  * Parallel worker requires result tape metadata, which is to be stored in
3068  * shared memory for leader
3069  */
3070  LogicalTapeFreeze(state->result_tape, &output);
3071 
3072  /* Store properties of output tape, and update finished worker count */
3073  SpinLockAcquire(&shared->mutex);
3074  shared->tapes[state->worker] = output;
3075  shared->workersFinished++;
3076  SpinLockRelease(&shared->mutex);
3077 }
3078 
3079 /*
3080  * worker_nomergeruns - dump memtuples in worker, without merging
3081  *
3082  * This called as an alternative to mergeruns() with a worker when no
3083  * merging is required.
3084  */
3085 static void
3087 {
3088  Assert(WORKER(state));
3089  Assert(state->result_tape == NULL);
3090  Assert(state->nOutputRuns == 1);
3091 
3092  state->result_tape = state->destTape;
3094 }
3095 
3096 /*
3097  * leader_takeover_tapes - create tapeset for leader from worker tapes
3098  *
3099  * So far, leader Tuplesortstate has performed no actual sorting. By now, all
3100  * sorting has occurred in workers, all of which must have already returned
3101  * from tuplesort_performsort().
3102  *
3103  * When this returns, leader process is left in a state that is virtually
3104  * indistinguishable from it having generated runs as a serial external sort
3105  * might have.
3106  */
3107 static void
3109 {
3110  Sharedsort *shared = state->shared;
3111  int nParticipants = state->nParticipants;
3112  int workersFinished;
3113  int j;
3114 
3115  Assert(LEADER(state));
3116  Assert(nParticipants >= 1);
3117 
3118  SpinLockAcquire(&shared->mutex);
3119  workersFinished = shared->workersFinished;
3120  SpinLockRelease(&shared->mutex);
3121 
3122  if (nParticipants != workersFinished)
3123  elog(ERROR, "cannot take over tapes before all workers finish");
3124 
3125  /*
3126  * Create the tapeset from worker tapes, including a leader-owned tape at
3127  * the end. Parallel workers are far more expensive than logical tapes,
3128  * so the number of tapes allocated here should never be excessive.
3129  */
3130  inittapestate(state, nParticipants);
3131  state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
3132 
3133  /*
3134  * Set currentRun to reflect the number of runs we will merge (it's not
3135  * used for anything, this is just pro forma)
3136  */
3137  state->currentRun = nParticipants;
3138 
3139  /*
3140  * Initialize the state to look the same as after building the initial
3141  * runs.
3142  *
3143  * There will always be exactly 1 run per worker, and exactly one input
3144  * tape per run, because workers always output exactly 1 run, even when
3145  * there were no input tuples for workers to sort.
3146  */
3147  state->inputTapes = NULL;
3148  state->nInputTapes = 0;
3149  state->nInputRuns = 0;
3150 
3151  state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
3152  state->nOutputTapes = nParticipants;
3153  state->nOutputRuns = nParticipants;
3154 
3155  for (j = 0; j < nParticipants; j++)
3156  {
3157  state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
3158  }
3159 
3160  state->status = TSS_BUILDRUNS;
3161 }
3162 
3163 /*
3164  * Convenience routine to free a tuple previously loaded into sort memory
3165  */
3166 static void
3168 {
3169  if (stup->tuple)
3170  {
3172  pfree(stup->tuple);
3173  stup->tuple = NULL;
3174  }
3175 }
3176 
3177 int
3179 {
3180  if (x < y)
3181  return -1;
3182  else if (x > y)
3183  return 1;
3184  else
3185  return 0;
3186 }
3187 
3188 #if SIZEOF_DATUM >= 8
3189 int
3190 ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
3191 {
3192  int64 xx = DatumGetInt64(x);
3193  int64 yy = DatumGetInt64(y);
3194 
3195  if (xx < yy)
3196  return -1;
3197  else if (xx > yy)
3198  return 1;
3199  else
3200  return 0;
3201 }
3202 #endif
3203 
3204 int
3206 {
3207  int32 xx = DatumGetInt32(x);
3208  int32 yy = DatumGetInt32(y);
3209 
3210  if (xx < yy)
3211  return -1;
3212  else if (xx > yy)
3213  return 1;
3214  else
3215  return 0;
3216 }
void PrepareTempTablespaces(void)
Definition: tablespace.c:1331
MemoryContext BumpContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: bump.c:131
#define Min(x, y)
Definition: c.h:1004
#define MAXALIGN(LEN)
Definition: c.h:811
signed int int32
Definition: c.h:494
#define Max(x, y)
Definition: c.h:998
#define INT64_FORMAT
Definition: c.h:548
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define pg_attribute_always_inline
Definition: c.h:234
size_t Size
Definition: c.h:605
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
static int compare(const void *arg1, const void *arg2)
Definition: geqo_pool.c:145
FILE * output
int y
Definition: isn.c:72
int b
Definition: isn.c:70
int x
Definition: isn.c:71
int a
Definition: isn.c:69
int j
Definition: isn.c:74
int i
Definition: isn.c:73
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void pfree(void *pointer)
Definition: mcxt.c:1521
Size GetMemoryChunkSpace(void *pointer)
Definition: mcxt.c:721
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1317
void MemoryContextResetOnly(MemoryContext context)
Definition: mcxt.c:402
void * repalloc_huge(void *pointer, Size size)
Definition: mcxt.c:1672
#define AllocSetContextCreate
Definition: memutils.h:129
#define MaxAllocHugeSize
Definition: memutils.h:45
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void size_t len
const char * pg_rusage_show(const PGRUsage *ru0)
Definition: pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition: pg_rusage.c:27
static char * buf
Definition: pg_test_fsync.c:73
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:385
uintptr_t Datum
Definition: postgres.h:64
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
MemoryContextSwitchTo(old_ctx)
int slock_t
Definition: s_lock.h:670
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:56
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
Definition: sharedfileset.c:38
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static int ApplyUnsignedSortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:233
static int ApplyInt32SortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
Definition: sortsupport.h:302
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
SharedFileSet fileset
Definition: tuplesort.c:364
TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: tuplesort.c:373
int workersFinished
Definition: tuplesort.c:361
int nTapes
Definition: tuplesort.c:367
slock_t mutex
Definition: tuplesort.c:350
int currentWorker
Definition: tuplesort.c:360
Sharedsort * sharedsort
Definition: tuplesort.h:58
bool ssup_nulls_first
Definition: sortsupport.h:75
void * tuple
Definition: tuplesort.h:149
int srctape
Definition: tuplesort.h:152
Datum datum1
Definition: tuplesort.h:150
int64 firstblocknumber
Definition: logtape.h:54
TuplesortMethod sortMethod
Definition: tuplesort.h:112
TuplesortSpaceType spaceType
Definition: tuplesort.h:113
void * lastReturnedTuple
Definition: tuplesort.c:266
LogicalTapeSet * tapeset
Definition: tuplesort.c:210
bool isMaxSpaceDisk
Definition: tuplesort.c:206
bool growmemtuples
Definition: tuplesort.c:222
SortTuple * memtuples
Definition: tuplesort.c:219
int64 maxSpace
Definition: tuplesort.c:204
LogicalTape ** inputTapes
Definition: tuplesort.c:282
bool slabAllocatorUsed
Definition: tuplesort.c:251
TuplesortPublic base
Definition: tuplesort.c:189
char * slabMemoryEnd
Definition: tuplesort.c:254
int64 tupleMem
Definition: tuplesort.c:195
PGRUsage ru_start
Definition: tuplesort.c:339
char * slabMemoryBegin
Definition: tuplesort.c:253
LogicalTape ** outputTapes
Definition: tuplesort.c:286
bool eof_reached
Definition: tuplesort.c:299
size_t tape_buffer_mem
Definition: tuplesort.c:258
TupSortStatus status
Definition: tuplesort.c:190
int64 availMem
Definition: tuplesort.c:200
LogicalTape * destTape
Definition: tuplesort.c:290
TupSortStatus maxSpaceStatus
Definition: tuplesort.c:209
bool markpos_eof
Definition: tuplesort.c:304
int64 abbrevNext
Definition: tuplesort.c:332
int64 markpos_block
Definition: tuplesort.c:302
Sharedsort * shared
Definition: tuplesort.c:323
LogicalTape * result_tape
Definition: tuplesort.c:297
SlabSlot * slabFreeHead
Definition: tuplesort.c:255
int markpos_offset
Definition: tuplesort.c:303
int64 allowedMem
Definition: tuplesort.c:201
Definition: regguts.h:323
void tuplesort_rescan(Tuplesortstate *state)
Definition: tuplesort.c:2441
void tuplesort_performsort(Tuplesortstate *state)
Definition: tuplesort.c:1386
int tuplesort_merge_order(int64 allowedMem)
Definition: tuplesort.c:1805
#define TAPE_BUFFER_OVERHEAD
Definition: tuplesort.c:180
static void tuplesort_heap_delete_top(Tuplesortstate *state)
Definition: tuplesort.c:2813
#define INITIAL_MEMTUPSIZE
Definition: tuplesort.c:120
static unsigned int getlen(LogicalTape *tape, bool eofOK)
Definition: tuplesort.c:2895
void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
Definition: tuplesort.c:2977
#define COMPARETUP(state, a, b)
Definition: tuplesort.c:400
Tuplesortstate * tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
Definition: tuplesort.c:646
static void selectnewtape(Tuplesortstate *state)
Definition: tuplesort.c:1977
void tuplesort_reset(Tuplesortstate *state)
Definition: tuplesort.c:1040
#define SERIAL(state)
Definition: tuplesort.c:407
#define FREESTATE(state)
Definition: tuplesort.c:403
static void markrunend(LogicalTape *tape)
Definition: tuplesort.c:2908
bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
Definition: tuplesort.c:1737
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
Definition: tuplesort.c:3167
#define REMOVEABBREV(state, stup, count)
Definition: tuplesort.c:399
#define LACKMEM(state)
Definition: tuplesort.c:404
static void reversedirection(Tuplesortstate *state)
Definition: tuplesort.c:2877
#define USEMEM(state, amt)
Definition: tuplesort.c:405
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2778
static bool grow_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:1073
int ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3178
static void beginmerge(Tuplesortstate *state)
Definition: tuplesort.c:2293
static void make_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2626
bool tuplesort_used_bound(Tuplesortstate *state)
Definition: tuplesort.c:892
#define WRITETUP(state, tape, stup)
Definition: tuplesort.c:401
static void sort_bounded_heap(Tuplesortstate *state)
Definition: tuplesort.c:2675
TupSortStatus
Definition: tuplesort.c:157
@ TSS_SORTEDONTAPE
Definition: tuplesort.c:162
@ TSS_SORTEDINMEM
Definition: tuplesort.c:161
@ TSS_INITIAL
Definition: tuplesort.c:158
@ TSS_FINALMERGE
Definition: tuplesort.c:163
@ TSS_BUILDRUNS
Definition: tuplesort.c:160
@ TSS_BOUNDED
Definition: tuplesort.c:159
static int worker_get_identifier(Tuplesortstate *state)
Definition: tuplesort.c:3020
static void mergeonerun(Tuplesortstate *state)
Definition: tuplesort.c:2233
#define FREEMEM(state, amt)
Definition: tuplesort.c:406
const char * tuplesort_space_type_name(TuplesortSpaceType t)
Definition: tuplesort.c:2605
#define MAXORDER
Definition: tuplesort.c:179
static void inittapestate(Tuplesortstate *state, int maxTapes)
Definition: tuplesort.c:1943
#define SLAB_SLOT_SIZE
Definition: tuplesort.c:144
static void leader_takeover_tapes(Tuplesortstate *state)
Definition: tuplesort.c:3108
Size tuplesort_estimate_shared(int nWorkers)
Definition: tuplesort.c:2956
void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)
Definition: tuplesort.c:2538
static void tuplesort_sort_memtuples(Tuplesortstate *state)
Definition: tuplesort.c:2715
void tuplesort_end(Tuplesortstate *state)
Definition: tuplesort.c:972
static void inittapes(Tuplesortstate *state, bool mergeruns)
Definition: tuplesort.c:1892
void tuplesort_markpos(Tuplesortstate *state)
Definition: tuplesort.c:2474
void tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbrev, Size tuplen)
Definition: tuplesort.c:1190
#define MERGE_BUFFER_SIZE
Definition: tuplesort.c:181
#define READTUP(state, stup, tape, len)
Definition: tuplesort.c:402
int ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
Definition: tuplesort.c:3205
#define LEADER(state)
Definition: tuplesort.c:409
#define WORKER(state)
Definition: tuplesort.c:408
bool tuplesort_gettuple_common(Tuplesortstate *state, bool forward, SortTuple *stup)
Definition: tuplesort.c:1497
static int64 merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, int maxOutputTapes)
Definition: tuplesort.c:1860
static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
Definition: tuplesort.c:2321
union SlabSlot SlabSlot
static void tuplesort_updatemax(Tuplesortstate *state)
Definition: tuplesort.c:989
static void worker_freeze_result_tape(Tuplesortstate *state)
Definition: tuplesort.c:3048
bool trace_sort
Definition: tuplesort.c:125
#define RELEASE_SLAB_SLOT(state, tuple)
Definition: tuplesort.c:387
void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
Definition: tuplesort.c:3000
static void worker_nomergeruns(Tuplesortstate *state)
Definition: tuplesort.c:3086
static pg_attribute_always_inline int qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:499
static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
Definition: tuplesort.c:2837
void tuplesort_restorepos(Tuplesortstate *state)
Definition: tuplesort.c:2505
static pg_attribute_always_inline int qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
Definition: tuplesort.c:546
static void mergeruns(Tuplesortstate *state)
Definition: tuplesort.c:2046
void * tuplesort_readtup_alloc(Tuplesortstate *state, Size tuplen)
Definition: tuplesort.c:2922
#define MINORDER
Definition: tuplesort.c:178
static void tuplesort_begin_batch(Tuplesortstate *state)
Definition: tuplesort.c:758
void tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Definition: tuplesort.c:844
static void init_slab_allocator(Tuplesortstate *state, int numSlots)
Definition: tuplesort.c:2010
const char * tuplesort_method_name(TuplesortMethod m)
Definition: tuplesort.c:2582
static bool consider_abort_common(Tuplesortstate *state)
Definition: tuplesort.c:1342
static void tuplesort_free(Tuplesortstate *state)
Definition: tuplesort.c:903
static void dumptuples(Tuplesortstate *state, bool alltuples)
Definition: tuplesort.c:2340
#define TupleSortUseBumpTupleCxt(opt)
Definition: tuplesort.h:108
#define TUPLESORT_RANDOMACCESS
Definition: tuplesort.h:96
#define TUPLESORT_ALLOWBOUNDED
Definition: tuplesort.h:99
TuplesortSpaceType
Definition: tuplesort.h:87
@ SORT_SPACE_TYPE_DISK
Definition: tuplesort.h:88
@ SORT_SPACE_TYPE_MEMORY
Definition: tuplesort.h:89
TuplesortMethod
Definition: tuplesort.h:76
@ SORT_TYPE_EXTERNAL_SORT
Definition: tuplesort.h:80
@ SORT_TYPE_TOP_N_HEAPSORT
Definition: tuplesort.h:78
@ SORT_TYPE_QUICKSORT
Definition: tuplesort.h:79
@ SORT_TYPE_STILL_IN_PROGRESS
Definition: tuplesort.h:77
@ SORT_TYPE_EXTERNAL_MERGE
Definition: tuplesort.h:81
char buffer[SLAB_SLOT_SIZE]
Definition: tuplesort.c:149
union SlabSlot * nextfree
Definition: tuplesort.c:148