PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
logtape.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * logtape.c
4 * Management of "logical tapes" within temporary files.
5 *
6 * This module exists to support sorting via multiple merge passes (see
7 * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 * we implement it on disk by creating a separate file for each "tape",
9 * there is an annoying problem: the peak space usage is at least twice
10 * the volume of actual data to be sorted. (This must be so because each
11 * datum will appear in both the input and output tapes of the final
12 * merge pass.)
13 *
14 * We can work around this problem by recognizing that any one tape
15 * dataset (with the possible exception of the final output) is written
16 * and read exactly once in a perfectly sequential manner. Therefore,
17 * a datum once read will not be required again, and we can recycle its
18 * space for use by the new tape dataset(s) being generated. In this way,
19 * the total space usage is essentially just the actual data volume, plus
20 * insignificant bookkeeping and start/stop overhead.
21 *
22 * Few OSes allow arbitrary parts of a file to be released back to the OS,
23 * so we have to implement this space-recycling ourselves within a single
24 * logical file. logtape.c exists to perform this bookkeeping and provide
25 * the illusion of N independent tape devices to tuplesort.c. Note that
26 * logtape.c itself depends on buffile.c to provide a "logical file" of
27 * larger size than the underlying OS may support.
28 *
29 * For simplicity, we allocate and release space in the underlying file
30 * in BLCKSZ-size blocks. Space allocation boils down to keeping track
31 * of which blocks in the underlying file belong to which logical tape,
32 * plus any blocks that are free (recycled and not yet reused).
33 * The blocks in each logical tape form a chain, with a prev- and next-
34 * pointer in each block.
35 *
36 * The initial write pass is guaranteed to fill the underlying file
37 * perfectly sequentially, no matter how data is divided into logical tapes.
38 * Once we begin merge passes, the access pattern becomes considerably
39 * less predictable --- but the seeking involved should be comparable to
40 * what would happen if we kept each logical tape in a separate file,
41 * so there's no serious performance penalty paid to obtain the space
42 * savings of recycling. We try to localize the write accesses by always
43 * writing to the lowest-numbered free block when we have a choice; it's
44 * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
45 * policy for free blocks would be better?)
46 *
47 * To further make the I/Os more sequential, we can use a larger buffer
48 * when reading, and read multiple blocks from the same tape in one go,
49 * whenever the buffer becomes empty.
50 *
51 * To support the above policy of writing to the lowest free block, the
52 * freelist is a min heap.
53 *
54 * Since all the bookkeeping and buffer memory is allocated with palloc(),
55 * and the underlying file(s) are made with OpenTemporaryFile, all resources
56 * for a logical tape set are certain to be cleaned up even if processing
57 * is aborted by ereport(ERROR). To avoid confusion, the caller should take
58 * care that all calls for a single LogicalTapeSet are made in the same
59 * palloc context.
60 *
61 * To support parallel sort operations involving coordinated callers to
62 * tuplesort.c routines across multiple workers, it is necessary to
63 * concatenate each worker BufFile/tapeset into one single logical tapeset
64 * managed by the leader. Workers should have produced one final
65 * materialized tape (their entire output) when this happens in leader.
66 * There will always be the same number of runs as input tapes, and the same
67 * number of input tapes as participants (worker Tuplesortstates).
68 *
69 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
70 * Portions Copyright (c) 1994, Regents of the University of California
71 *
72 * IDENTIFICATION
73 * src/backend/utils/sort/logtape.c
74 *
75 *-------------------------------------------------------------------------
76 */
77
78#include "postgres.h"
79
80#include <fcntl.h>
81
82#include "storage/buffile.h"
83#include "utils/builtins.h"
84#include "utils/logtape.h"
85#include "utils/memdebug.h"
86#include "utils/memutils.h"
87
88/*
89 * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
90 *
91 * The first block of a tape has prev == -1. The last block of a tape
92 * stores the number of valid bytes on the block, inverted, in 'next'
93 * Therefore next < 0 indicates the last block.
94 */
95typedef struct TapeBlockTrailer
96{
97 int64 prev; /* previous block on this tape, or -1 on first
98 * block */
99 int64 next; /* next block on this tape, or # of valid
100 * bytes on last block (if < 0) */
102
103#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
104#define TapeBlockGetTrailer(buf) \
105 ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
106
107#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
108#define TapeBlockGetNBytes(buf) \
109 (TapeBlockIsLast(buf) ? \
110 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
111#define TapeBlockSetNBytes(buf, nbytes) \
112 (TapeBlockGetTrailer(buf)->next = -(nbytes))
113
114/*
115 * When multiple tapes are being written to concurrently (as in HashAgg),
116 * avoid excessive fragmentation by preallocating block numbers to individual
117 * tapes. Each preallocation doubles in size starting at
118 * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
119 *
120 * No filesystem operations are performed for preallocation; only the block
121 * numbers are reserved. This may lead to sparse writes, which will cause
122 * ltsWriteBlock() to fill in holes with zeros.
123 */
124#define TAPE_WRITE_PREALLOC_MIN 8
125#define TAPE_WRITE_PREALLOC_MAX 128
126
127/*
128 * This data structure represents a single "logical tape" within the set
129 * of logical tapes stored in the same file.
130 *
131 * While writing, we hold the current partially-written data block in the
132 * buffer. While reading, we can hold multiple blocks in the buffer. Note
133 * that we don't retain the trailers of a block when it's read into the
134 * buffer. The buffer therefore contains one large contiguous chunk of data
135 * from the tape.
136 */
138{
139 LogicalTapeSet *tapeSet; /* tape set this tape is part of */
140
141 bool writing; /* T while in write phase */
142 bool frozen; /* T if blocks should not be freed when read */
143 bool dirty; /* does buffer need to be written? */
144
145 /*
146 * Block numbers of the first, current, and next block of the tape.
147 *
148 * The "current" block number is only valid when writing, or reading from
149 * a frozen tape. (When reading from an unfrozen tape, we use a larger
150 * read buffer that holds multiple blocks, so the "current" block is
151 * ambiguous.)
152 *
153 * When concatenation of worker tape BufFiles is performed, an offset to
154 * the first block in the unified BufFile space is applied during reads.
155 */
160
161 /*
162 * Buffer for current data block(s).
163 */
164 char *buffer; /* physical buffer (separately palloc'd) */
165 int buffer_size; /* allocated size of the buffer */
166 int max_size; /* highest useful, safe buffer_size */
167 int pos; /* next read/write position in buffer */
168 int nbytes; /* total # of valid bytes in buffer */
169
170 /*
171 * Preallocated block numbers are held in an array sorted in descending
172 * order; blocks are consumed from the end of the array (lowest block
173 * numbers first).
174 */
176 int nprealloc; /* number of elements in list */
177 int prealloc_size; /* number of elements list can hold */
178};
179
180/*
181 * This data structure represents a set of related "logical tapes" sharing
182 * space in a single underlying file. (But that "file" may be multiple files
183 * if needed to escape OS limits on file size; buffile.c handles that for us.)
184 * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
185 * demand.
186 */
188{
189 BufFile *pfile; /* underlying file for whole tape set */
191 int worker; /* worker # if shared, -1 for leader/serial */
192
193 /*
194 * File size tracking. nBlocksWritten is the size of the underlying file,
195 * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
196 * by ltsReleaseBlock(), and it is always greater than or equal to
197 * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
198 * blocks that have been allocated for a tape, but have not been written
199 * to the underlying file yet. nHoleBlocks tracks the total number of
200 * blocks that are in unused holes between worker spaces following BufFile
201 * concatenation.
202 */
203 int64 nBlocksAllocated; /* # of blocks allocated */
204 int64 nBlocksWritten; /* # of blocks used in underlying file */
205 int64 nHoleBlocks; /* # of "hole" blocks left */
206
207 /*
208 * We store the numbers of recycled-and-available blocks in freeBlocks[].
209 * When there are no such blocks, we extend the underlying file.
210 *
211 * If forgetFreeSpace is true then any freed blocks are simply forgotten
212 * rather than being remembered in freeBlocks[]. See notes for
213 * LogicalTapeSetForgetFreeSpace().
214 */
215 bool forgetFreeSpace; /* are we remembering free blocks? */
216 int64 *freeBlocks; /* resizable array holding minheap */
217 int64 nFreeBlocks; /* # of currently free blocks */
218 Size freeBlocksLen; /* current allocated length of freeBlocks[] */
219 bool enable_prealloc; /* preallocate write blocks? */
220};
221
223static void ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer);
224static void ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer);
228static void ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum);
229static void ltsInitReadBuffer(LogicalTape *lt);
230
231
232/*
233 * Write a block-sized buffer to the specified block of the underlying file.
234 *
235 * No need for an error return convention; we ereport() on any error.
236 */
237static void
238ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer)
239{
240 /*
241 * BufFile does not support "holes", so if we're about to write a block
242 * that's past the current end of file, fill the space between the current
243 * end of file and the target block with zeros.
244 *
245 * This can happen either when tapes preallocate blocks; or for the last
246 * block of a tape which might not have been flushed.
247 *
248 * Note that BufFile concatenation can leave "holes" in BufFile between
249 * worker-owned block ranges. These are tracked for reporting purposes
250 * only. We never read from nor write to these hole blocks, and so they
251 * are not considered here.
252 */
253 while (blocknum > lts->nBlocksWritten)
254 {
255 PGIOAlignedBlock zerobuf;
256
257 MemSet(zerobuf.data, 0, sizeof(zerobuf));
258
259 ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
260 }
261
262 /* Write the requested block */
263 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
266 errmsg("could not seek to block %" PRId64 " of temporary file",
267 blocknum)));
268 BufFileWrite(lts->pfile, buffer, BLCKSZ);
269
270 /* Update nBlocksWritten, if we extended the file */
271 if (blocknum == lts->nBlocksWritten)
272 lts->nBlocksWritten++;
273}
274
275/*
276 * Read a block-sized buffer from the specified block of the underlying file.
277 *
278 * No need for an error return convention; we ereport() on any error. This
279 * module should never attempt to read a block it doesn't know is there.
280 */
281static void
282ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer)
283{
284 if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
287 errmsg("could not seek to block %" PRId64 " of temporary file",
288 blocknum)));
289 BufFileReadExact(lts->pfile, buffer, BLCKSZ);
290}
291
292/*
293 * Read as many blocks as we can into the per-tape buffer.
294 *
295 * Returns true if anything was read, 'false' on EOF.
296 */
297static bool
299{
300 lt->pos = 0;
301 lt->nbytes = 0;
302
303 do
304 {
305 char *thisbuf = lt->buffer + lt->nbytes;
306 int64 datablocknum = lt->nextBlockNumber;
307
308 /* Fetch next block number */
309 if (datablocknum == -1L)
310 break; /* EOF */
311 /* Apply worker offset, needed for leader tapesets */
312 datablocknum += lt->offsetBlockNumber;
313
314 /* Read the block */
315 ltsReadBlock(lt->tapeSet, datablocknum, thisbuf);
316 if (!lt->frozen)
317 ltsReleaseBlock(lt->tapeSet, datablocknum);
319
320 lt->nbytes += TapeBlockGetNBytes(thisbuf);
321 if (TapeBlockIsLast(thisbuf))
322 {
323 lt->nextBlockNumber = -1L;
324 /* EOF */
325 break;
326 }
327 else
328 lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
329
330 /* Advance to next block, if we have buffer space left */
331 } while (lt->buffer_size - lt->nbytes > BLCKSZ);
332
333 return (lt->nbytes > 0);
334}
335
336static inline uint64
338{
339 return 2 * i + 1;
340}
341
342static inline uint64
344{
345 return 2 * i + 2;
346}
347
348static inline uint64
350{
351 return (i - 1) / 2;
352}
353
354/*
355 * Get the next block for writing.
356 */
357static int64
359{
360 if (lts->enable_prealloc)
361 return ltsGetPreallocBlock(lts, lt);
362 else
363 return ltsGetFreeBlock(lts);
364}
365
366/*
367 * Select the lowest currently unused block from the tape set's global free
368 * list min heap.
369 */
370static int64
372{
373 int64 *heap = lts->freeBlocks;
374 int64 blocknum;
375 int64 heapsize;
376 int64 holeval;
377 uint64 holepos;
378
379 /* freelist empty; allocate a new block */
380 if (lts->nFreeBlocks == 0)
381 return lts->nBlocksAllocated++;
382
383 /* easy if heap contains one element */
384 if (lts->nFreeBlocks == 1)
385 {
386 lts->nFreeBlocks--;
387 return lts->freeBlocks[0];
388 }
389
390 /* remove top of minheap */
391 blocknum = heap[0];
392
393 /* we'll replace it with end of minheap array */
394 holeval = heap[--lts->nFreeBlocks];
395
396 /* sift down */
397 holepos = 0; /* holepos is where the "hole" is */
398 heapsize = lts->nFreeBlocks;
399 while (true)
400 {
401 uint64 left = left_offset(holepos);
402 uint64 right = right_offset(holepos);
403 uint64 min_child;
404
405 if (left < heapsize && right < heapsize)
406 min_child = (heap[left] < heap[right]) ? left : right;
407 else if (left < heapsize)
408 min_child = left;
409 else if (right < heapsize)
410 min_child = right;
411 else
412 break;
413
414 if (heap[min_child] >= holeval)
415 break;
416
417 heap[holepos] = heap[min_child];
418 holepos = min_child;
419 }
420 heap[holepos] = holeval;
421
422 return blocknum;
423}
424
425/*
426 * Return the lowest free block number from the tape's preallocation list.
427 * Refill the preallocation list with blocks from the tape set's free list if
428 * necessary.
429 */
430static int64
432{
433 /* sorted in descending order, so return the last element */
434 if (lt->nprealloc > 0)
435 return lt->prealloc[--lt->nprealloc];
436
437 if (lt->prealloc == NULL)
438 {
440 lt->prealloc = (int64 *) palloc(sizeof(int64) * lt->prealloc_size);
441 }
443 {
444 /* when the preallocation list runs out, double the size */
445 lt->prealloc_size *= 2;
448 lt->prealloc = (int64 *) repalloc(lt->prealloc,
449 sizeof(int64) * lt->prealloc_size);
450 }
451
452 /* refill preallocation list */
453 lt->nprealloc = lt->prealloc_size;
454 for (int i = lt->nprealloc; i > 0; i--)
455 {
456 lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
457
458 /* verify descending order */
459 Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
460 }
461
462 return lt->prealloc[--lt->nprealloc];
463}
464
465/*
466 * Return a block# to the freelist.
467 */
468static void
470{
471 int64 *heap;
472 uint64 holepos;
473
474 /*
475 * Do nothing if we're no longer interested in remembering free space.
476 */
477 if (lts->forgetFreeSpace)
478 return;
479
480 /*
481 * Enlarge freeBlocks array if full.
482 */
483 if (lts->nFreeBlocks >= lts->freeBlocksLen)
484 {
485 /*
486 * If the freelist becomes very large, just return and leak this free
487 * block.
488 */
489 if (lts->freeBlocksLen * 2 * sizeof(int64) > MaxAllocSize)
490 return;
491
492 lts->freeBlocksLen *= 2;
493 lts->freeBlocks = (int64 *) repalloc(lts->freeBlocks,
494 lts->freeBlocksLen * sizeof(int64));
495 }
496
497 /* create a "hole" at end of minheap array */
498 heap = lts->freeBlocks;
499 holepos = lts->nFreeBlocks;
500 lts->nFreeBlocks++;
501
502 /* sift up to insert blocknum */
503 while (holepos != 0)
504 {
505 uint64 parent = parent_offset(holepos);
506
507 if (heap[parent] < blocknum)
508 break;
509
510 heap[holepos] = heap[parent];
511 holepos = parent;
512 }
513 heap[holepos] = blocknum;
514}
515
516/*
517 * Lazily allocate and initialize the read buffer. This avoids waste when many
518 * tapes are open at once, but not all are active between rewinding and
519 * reading.
520 */
521static void
523{
524 Assert(lt->buffer_size > 0);
525 lt->buffer = palloc(lt->buffer_size);
526
527 /* Read the first block, or reset if tape is empty */
529 lt->pos = 0;
530 lt->nbytes = 0;
532}
533
534/*
535 * Create a tape set, backed by a temporary underlying file.
536 *
537 * The tape set is initially empty. Use LogicalTapeCreate() to create
538 * tapes in it.
539 *
540 * In a single-process sort, pass NULL argument for fileset, and -1 for
541 * worker.
542 *
543 * In a parallel sort, parallel workers pass the shared fileset handle and
544 * their own worker number. After the workers have finished, create the
545 * tape set in the leader, passing the shared fileset handle and -1 for
546 * worker, and use LogicalTapeImport() to import the worker tapes into it.
547 *
548 * Currently, the leader will only import worker tapes into the set, it does
549 * not create tapes of its own, although in principle that should work.
550 *
551 * If preallocate is true, blocks for each individual tape are allocated in
552 * batches. This avoids fragmentation when writing multiple tapes at the
553 * same time.
554 */
556LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
557{
558 LogicalTapeSet *lts;
559
560 /*
561 * Create top-level struct including per-tape LogicalTape structs.
562 */
563 lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
564 lts->nBlocksAllocated = 0L;
565 lts->nBlocksWritten = 0L;
566 lts->nHoleBlocks = 0L;
567 lts->forgetFreeSpace = false;
568 lts->freeBlocksLen = 32; /* reasonable initial guess */
569 lts->freeBlocks = (int64 *) palloc(lts->freeBlocksLen * sizeof(int64));
570 lts->nFreeBlocks = 0;
571 lts->enable_prealloc = preallocate;
572
573 lts->fileset = fileset;
574 lts->worker = worker;
575
576 /*
577 * Create temp BufFile storage as required.
578 *
579 * In leader, we hijack the BufFile of the first tape that's imported, and
580 * concatenate the BufFiles of any subsequent tapes to that. Hence don't
581 * create a BufFile here. Things are simpler for the worker case and the
582 * serial case, though. They are generally very similar -- workers use a
583 * shared fileset, whereas serial sorts use a conventional serial BufFile.
584 */
585 if (fileset && worker == -1)
586 lts->pfile = NULL;
587 else if (fileset)
588 {
589 char filename[MAXPGPATH];
590
591 pg_itoa(worker, filename);
592 lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
593 }
594 else
595 lts->pfile = BufFileCreateTemp(false);
596
597 return lts;
598}
599
600/*
601 * Claim ownership of a logical tape from an existing shared BufFile.
602 *
603 * Caller should be leader process. Though tapes are marked as frozen in
604 * workers, they are not frozen when opened within leader, since unfrozen tapes
605 * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
606 * for random access.)
607 */
610{
611 LogicalTape *lt;
612 int64 tapeblocks;
613 char filename[MAXPGPATH];
614 BufFile *file;
615 int64 filesize;
616
617 lt = ltsCreateTape(lts);
618
619 /*
620 * build concatenated view of all buffiles, remembering the block number
621 * where each source file begins.
622 */
623 pg_itoa(worker, filename);
624 file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
625 filesize = BufFileSize(file);
626
627 /*
628 * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
629 * block offset into each tape as we go.
630 */
632 if (lts->pfile == NULL)
633 {
634 lts->pfile = file;
635 lt->offsetBlockNumber = 0L;
636 }
637 else
638 {
639 lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
640 }
641 /* Don't allocate more for read buffer than could possibly help */
642 lt->max_size = Min(MaxAllocSize, filesize);
643 tapeblocks = filesize / BLCKSZ;
644
645 /*
646 * Update # of allocated blocks and # blocks written to reflect the
647 * imported BufFile. Allocated/written blocks include space used by holes
648 * left between concatenated BufFiles. Also track the number of hole
649 * blocks so that we can later work backwards to calculate the number of
650 * physical blocks for instrumentation.
651 */
653
654 lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
656
657 return lt;
658}
659
660/*
661 * Close a logical tape set and release all resources.
662 *
663 * NOTE: This doesn't close any of the tapes! You must close them
664 * first, or you can let them be destroyed along with the memory context.
665 */
666void
668{
669 BufFileClose(lts->pfile);
670 pfree(lts->freeBlocks);
671 pfree(lts);
672}
673
674/*
675 * Create a logical tape in the given tapeset.
676 *
677 * The tape is initialized in write state.
678 */
681{
682 /*
683 * The only thing that currently prevents creating new tapes in leader is
684 * the fact that BufFiles opened using BufFileOpenShared() are read-only
685 * by definition, but that could be changed if it seemed worthwhile. For
686 * now, writing to the leader tape will raise a "Bad file descriptor"
687 * error, so tuplesort must avoid writing to the leader tape altogether.
688 */
689 if (lts->fileset && lts->worker == -1)
690 elog(ERROR, "cannot create new tapes in leader process");
691
692 return ltsCreateTape(lts);
693}
694
695static LogicalTape *
697{
698 LogicalTape *lt;
699
700 /*
701 * Create per-tape struct. Note we allocate the I/O buffer lazily.
702 */
703 lt = palloc(sizeof(LogicalTape));
704 lt->tapeSet = lts;
705 lt->writing = true;
706 lt->frozen = false;
707 lt->dirty = false;
708 lt->firstBlockNumber = -1L;
709 lt->curBlockNumber = -1L;
710 lt->nextBlockNumber = -1L;
711 lt->offsetBlockNumber = 0L;
712 lt->buffer = NULL;
713 lt->buffer_size = 0;
714 /* palloc() larger than MaxAllocSize would fail */
716 lt->pos = 0;
717 lt->nbytes = 0;
718 lt->prealloc = NULL;
719 lt->nprealloc = 0;
720 lt->prealloc_size = 0;
721
722 return lt;
723}
724
725/*
726 * Close a logical tape.
727 *
728 * Note: This doesn't return any blocks to the free list! You must read
729 * the tape to the end first, to reuse the space. In current use, though,
730 * we only close tapes after fully reading them.
731 */
732void
734{
735 if (lt->buffer)
736 pfree(lt->buffer);
737 pfree(lt);
738}
739
740/*
741 * Mark a logical tape set as not needing management of free space anymore.
742 *
743 * This should be called if the caller does not intend to write any more data
744 * into the tape set, but is reading from un-frozen tapes. Since no more
745 * writes are planned, remembering free blocks is no longer useful. Setting
746 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
747 * is not designed to handle large numbers of free blocks.
748 */
749void
751{
752 lts->forgetFreeSpace = true;
753}
754
755/*
756 * Write to a logical tape.
757 *
758 * There are no error returns; we ereport() on failure.
759 */
760void
761LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
762{
763 LogicalTapeSet *lts = lt->tapeSet;
764 size_t nthistime;
765
766 Assert(lt->writing);
767 Assert(lt->offsetBlockNumber == 0L);
768
769 /* Allocate data buffer and first block on first write */
770 if (lt->buffer == NULL)
771 {
772 lt->buffer = (char *) palloc(BLCKSZ);
773 lt->buffer_size = BLCKSZ;
774 }
775 if (lt->curBlockNumber == -1)
776 {
777 Assert(lt->firstBlockNumber == -1);
778 Assert(lt->pos == 0);
779
780 lt->curBlockNumber = ltsGetBlock(lts, lt);
782
783 TapeBlockGetTrailer(lt->buffer)->prev = -1L;
784 }
785
786 Assert(lt->buffer_size == BLCKSZ);
787 while (size > 0)
788 {
789 if (lt->pos >= (int) TapeBlockPayloadSize)
790 {
791 /* Buffer full, dump it out */
792 int64 nextBlockNumber;
793
794 if (!lt->dirty)
795 {
796 /* Hmm, went directly from reading to writing? */
797 elog(ERROR, "invalid logtape state: should be dirty");
798 }
799
800 /*
801 * First allocate the next block, so that we can store it in the
802 * 'next' pointer of this block.
803 */
804 nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
805
806 /* set the next-pointer and dump the current block. */
807 TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
809
810 /* initialize the prev-pointer of the next block */
812 lt->curBlockNumber = nextBlockNumber;
813 lt->pos = 0;
814 lt->nbytes = 0;
815 }
816
817 nthistime = TapeBlockPayloadSize - lt->pos;
818 if (nthistime > size)
819 nthistime = size;
820 Assert(nthistime > 0);
821
822 memcpy(lt->buffer + lt->pos, ptr, nthistime);
823
824 lt->dirty = true;
825 lt->pos += nthistime;
826 if (lt->nbytes < lt->pos)
827 lt->nbytes = lt->pos;
828 ptr = (const char *) ptr + nthistime;
829 size -= nthistime;
830 }
831}
832
833/*
834 * Rewind logical tape and switch from writing to reading.
835 *
836 * The tape must currently be in writing state, or "frozen" in read state.
837 *
838 * 'buffer_size' specifies how much memory to use for the read buffer.
839 * Regardless of the argument, the actual amount of memory used is between
840 * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
841 * rounded down and truncated to fit those constraints, if necessary. If the
842 * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
843 * byte buffer is used.
844 */
845void
846LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
847{
848 LogicalTapeSet *lts = lt->tapeSet;
849
850 /*
851 * Round and cap buffer_size if needed.
852 */
853 if (lt->frozen)
854 buffer_size = BLCKSZ;
855 else
856 {
857 /* need at least one block */
858 if (buffer_size < BLCKSZ)
859 buffer_size = BLCKSZ;
860
861 /* palloc() larger than max_size is unlikely to be helpful */
862 if (buffer_size > lt->max_size)
863 buffer_size = lt->max_size;
864
865 /* round down to BLCKSZ boundary */
866 buffer_size -= buffer_size % BLCKSZ;
867 }
868
869 if (lt->writing)
870 {
871 /*
872 * Completion of a write phase. Flush last partial data block, and
873 * rewind for normal (destructive) read.
874 */
875 if (lt->dirty)
876 {
877 /*
878 * As long as we've filled the buffer at least once, its contents
879 * are entirely defined from valgrind's point of view, even though
880 * contents beyond the current end point may be stale. But it's
881 * possible - at least in the case of a parallel sort - to sort
882 * such small amount of data that we do not fill the buffer even
883 * once. Tell valgrind that its contents are defined, so it
884 * doesn't bleat.
885 */
887 lt->buffer_size - lt->nbytes);
888
891 }
892 lt->writing = false;
893 }
894 else
895 {
896 /*
897 * This is only OK if tape is frozen; we rewind for (another) read
898 * pass.
899 */
900 Assert(lt->frozen);
901 }
902
903 if (lt->buffer)
904 pfree(lt->buffer);
905
906 /* the buffer is lazily allocated, but set the size here */
907 lt->buffer = NULL;
908 lt->buffer_size = buffer_size;
909
910 /* free the preallocation list, and return unused block numbers */
911 if (lt->prealloc != NULL)
912 {
913 for (int i = lt->nprealloc; i > 0; i--)
914 ltsReleaseBlock(lts, lt->prealloc[i - 1]);
915 pfree(lt->prealloc);
916 lt->prealloc = NULL;
917 lt->nprealloc = 0;
918 lt->prealloc_size = 0;
919 }
920}
921
922/*
923 * Read from a logical tape.
924 *
925 * Early EOF is indicated by return value less than #bytes requested.
926 */
927size_t
928LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
929{
930 size_t nread = 0;
931 size_t nthistime;
932
933 Assert(!lt->writing);
934
935 if (lt->buffer == NULL)
937
938 while (size > 0)
939 {
940 if (lt->pos >= lt->nbytes)
941 {
942 /* Try to load more data into buffer. */
943 if (!ltsReadFillBuffer(lt))
944 break; /* EOF */
945 }
946
947 nthistime = lt->nbytes - lt->pos;
948 if (nthistime > size)
949 nthistime = size;
950 Assert(nthistime > 0);
951
952 memcpy(ptr, lt->buffer + lt->pos, nthistime);
953
954 lt->pos += nthistime;
955 ptr = (char *) ptr + nthistime;
956 size -= nthistime;
957 nread += nthistime;
958 }
959
960 return nread;
961}
962
963/*
964 * "Freeze" the contents of a tape so that it can be read multiple times
965 * and/or read backwards. Once a tape is frozen, its contents will not
966 * be released until the LogicalTapeSet is destroyed. This is expected
967 * to be used only for the final output pass of a merge.
968 *
969 * This *must* be called just at the end of a write pass, before the
970 * tape is rewound (after rewind is too late!). It performs a rewind
971 * and switch to read mode "for free". An immediately following rewind-
972 * for-read call is OK but not necessary.
973 *
974 * share output argument is set with details of storage used for tape after
975 * freezing, which may be passed to LogicalTapeSetCreate within leader
976 * process later. This metadata is only of interest to worker callers
977 * freezing their final output for leader (single materialized tape).
978 * Serial sorts should set share to NULL.
979 */
980void
982{
983 LogicalTapeSet *lts = lt->tapeSet;
984
985 Assert(lt->writing);
986 Assert(lt->offsetBlockNumber == 0L);
987
988 /*
989 * Completion of a write phase. Flush last partial data block, and rewind
990 * for nondestructive read.
991 */
992 if (lt->dirty)
993 {
994 /*
995 * As long as we've filled the buffer at least once, its contents are
996 * entirely defined from valgrind's point of view, even though
997 * contents beyond the current end point may be stale. But it's
998 * possible - at least in the case of a parallel sort - to sort such
999 * small amount of data that we do not fill the buffer even once. Tell
1000 * valgrind that its contents are defined, so it doesn't bleat.
1001 */
1003 lt->buffer_size - lt->nbytes);
1004
1007 }
1008 lt->writing = false;
1009 lt->frozen = true;
1010
1011 /*
1012 * The seek and backspace functions assume a single block read buffer.
1013 * That's OK with current usage. A larger buffer is helpful to make the
1014 * read pattern of the backing file look more sequential to the OS, when
1015 * we're reading from multiple tapes. But at the end of a sort, when a
1016 * tape is frozen, we only read from a single tape anyway.
1017 */
1018 if (!lt->buffer || lt->buffer_size != BLCKSZ)
1019 {
1020 if (lt->buffer)
1021 pfree(lt->buffer);
1022 lt->buffer = palloc(BLCKSZ);
1023 lt->buffer_size = BLCKSZ;
1024 }
1025
1026 /* Read the first block, or reset if tape is empty */
1028 lt->pos = 0;
1029 lt->nbytes = 0;
1030
1031 if (lt->firstBlockNumber == -1L)
1032 lt->nextBlockNumber = -1L;
1034 if (TapeBlockIsLast(lt->buffer))
1035 lt->nextBlockNumber = -1L;
1036 else
1038 lt->nbytes = TapeBlockGetNBytes(lt->buffer);
1039
1040 /* Handle extra steps when caller is to share its tapeset */
1041 if (share)
1042 {
1045 }
1046}
1047
1048/*
1049 * Backspace the tape a given number of bytes. (We also support a more
1050 * general seek interface, see below.)
1051 *
1052 * *Only* a frozen-for-read tape can be backed up; we don't support
1053 * random access during write, and an unfrozen read tape may have
1054 * already discarded the desired data!
1055 *
1056 * Returns the number of bytes backed up. It can be less than the
1057 * requested amount, if there isn't that much data before the current
1058 * position. The tape is positioned to the beginning of the tape in
1059 * that case.
1060 */
1061size_t
1063{
1064 size_t seekpos = 0;
1065
1066 Assert(lt->frozen);
1067 Assert(lt->buffer_size == BLCKSZ);
1068
1069 if (lt->buffer == NULL)
1071
1072 /*
1073 * Easy case for seek within current block.
1074 */
1075 if (size <= (size_t) lt->pos)
1076 {
1077 lt->pos -= (int) size;
1078 return size;
1079 }
1080
1081 /*
1082 * Not-so-easy case, have to walk back the chain of blocks. This
1083 * implementation would be pretty inefficient for long seeks, but we
1084 * really aren't doing that (a seek over one tuple is typical).
1085 */
1086 seekpos = (size_t) lt->pos; /* part within this block */
1087 while (size > seekpos)
1088 {
1089 int64 prev = TapeBlockGetTrailer(lt->buffer)->prev;
1090
1091 if (prev == -1L)
1092 {
1093 /* Tried to back up beyond the beginning of tape. */
1094 if (lt->curBlockNumber != lt->firstBlockNumber)
1095 elog(ERROR, "unexpected end of tape");
1096 lt->pos = 0;
1097 return seekpos;
1098 }
1099
1100 ltsReadBlock(lt->tapeSet, prev, lt->buffer);
1101
1102 if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1103 elog(ERROR, "broken tape, next of block %" PRId64 " is %" PRId64 ", expected %" PRId64,
1104 prev,
1105 TapeBlockGetTrailer(lt->buffer)->next,
1106 lt->curBlockNumber);
1107
1109 lt->curBlockNumber = prev;
1111
1112 seekpos += TapeBlockPayloadSize;
1113 }
1114
1115 /*
1116 * 'seekpos' can now be greater than 'size', because it points to the
1117 * beginning the target block. The difference is the position within the
1118 * page.
1119 */
1120 lt->pos = seekpos - size;
1121 return size;
1122}
1123
1124/*
1125 * Seek to an arbitrary position in a logical tape.
1126 *
1127 * *Only* a frozen-for-read tape can be seeked.
1128 *
1129 * Must be called with a block/offset previously returned by
1130 * LogicalTapeTell().
1131 */
1132void
1133LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
1134{
1135 Assert(lt->frozen);
1136 Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1137 Assert(lt->buffer_size == BLCKSZ);
1138
1139 if (lt->buffer == NULL)
1141
1142 if (blocknum != lt->curBlockNumber)
1143 {
1144 ltsReadBlock(lt->tapeSet, blocknum, lt->buffer);
1145 lt->curBlockNumber = blocknum;
1148 }
1149
1150 if (offset > lt->nbytes)
1151 elog(ERROR, "invalid tape seek position");
1152 lt->pos = offset;
1153}
1154
1155/*
1156 * Obtain current position in a form suitable for a later LogicalTapeSeek.
1157 *
1158 * NOTE: it'd be OK to do this during write phase with intention of using
1159 * the position for a seek after freezing. Not clear if anyone needs that.
1160 */
1161void
1162LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
1163{
1164 if (lt->buffer == NULL)
1166
1167 Assert(lt->offsetBlockNumber == 0L);
1168
1169 /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1170 Assert(lt->buffer_size == BLCKSZ);
1171
1172 *blocknum = lt->curBlockNumber;
1173 *offset = lt->pos;
1174}
1175
1176/*
1177 * Obtain total disk space currently used by a LogicalTapeSet, in blocks. Does
1178 * not account for open write buffer, if any.
1179 */
1180int64
1182{
1183 return lts->nBlocksWritten - lts->nHoleBlocks;
1184}
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
int BufFileSeekBlock(BufFile *file, int64 blknum)
Definition: buffile.c:851
void BufFileExportFileSet(BufFile *file)
Definition: buffile.c:394
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:193
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
int64 BufFileSize(BufFile *file)
Definition: buffile.c:866
void BufFileClose(BufFile *file)
Definition: buffile.c:412
int64 BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:902
#define Min(x, y)
Definition: c.h:975
int64_t int64
Definition: c.h:499
uint64_t uint64
Definition: c.h:503
#define MemSet(start, val, len)
Definition: c.h:991
size_t Size
Definition: c.h:576
int errcode_for_file_access(void)
Definition: elog.c:877
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
#define MaxAllocSize
Definition: fe_memutils.h:22
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:104
static LogicalTape * ltsCreateTape(LogicalTapeSet *lts)
Definition: logtape.c:696
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
static int64 ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:431
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
static uint64 parent_offset(uint64 i)
Definition: logtape.c:349
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1181
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
#define TapeBlockPayloadSize
Definition: logtape.c:103
struct TapeBlockTrailer TapeBlockTrailer
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
static bool ltsReadFillBuffer(LogicalTape *lt)
Definition: logtape.c:298
static void ltsInitReadBuffer(LogicalTape *lt)
Definition: logtape.c:522
#define TAPE_WRITE_PREALLOC_MIN
Definition: logtape.c:124
static uint64 left_offset(uint64 i)
Definition: logtape.c:337
static void ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer)
Definition: logtape.c:238
static int64 ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:358
#define TapeBlockIsLast(buf)
Definition: logtape.c:107
#define TAPE_WRITE_PREALLOC_MAX
Definition: logtape.c:125
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
static void ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer)
Definition: logtape.c:282
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:108
static void ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum)
Definition: logtape.c:469
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:111
static int64 ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:371
static uint64 right_offset(uint64 i)
Definition: logtape.c:343
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:2170
void pfree(void *pointer)
Definition: mcxt.c:2150
void * palloc(Size size)
Definition: mcxt.c:1943
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
int pg_itoa(int16 i, char *a)
Definition: numutils.c:1042
#define MAXPGPATH
while(p+4<=pend)
static char * filename
Definition: pg_dumpall.c:123
BufFile * pfile
Definition: logtape.c:189
bool forgetFreeSpace
Definition: logtape.c:215
SharedFileSet * fileset
Definition: logtape.c:190
int64 nHoleBlocks
Definition: logtape.c:205
int64 * freeBlocks
Definition: logtape.c:216
Size freeBlocksLen
Definition: logtape.c:218
int64 nFreeBlocks
Definition: logtape.c:217
bool enable_prealloc
Definition: logtape.c:219
int64 nBlocksAllocated
Definition: logtape.c:203
int64 nBlocksWritten
Definition: logtape.c:204
int64 curBlockNumber
Definition: logtape.c:157
int64 firstBlockNumber
Definition: logtape.c:156
int64 * prealloc
Definition: logtape.c:175
int max_size
Definition: logtape.c:166
int nprealloc
Definition: logtape.c:176
char * buffer
Definition: logtape.c:164
int64 nextBlockNumber
Definition: logtape.c:158
bool frozen
Definition: logtape.c:142
int prealloc_size
Definition: logtape.c:177
int64 offsetBlockNumber
Definition: logtape.c:159
int buffer_size
Definition: logtape.c:165
bool writing
Definition: logtape.c:141
LogicalTapeSet * tapeSet
Definition: logtape.c:139
int nbytes
Definition: logtape.c:168
bool dirty
Definition: logtape.c:143
int64 firstblocknumber
Definition: logtape.h:54
char data[BLCKSZ]
Definition: c.h:1108