PostgreSQL Source Code  git master
logtape.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  * Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted. (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass.)
13  *
14  * We can work around this problem by recognizing that any one tape
15  * dataset (with the possible exception of the final output) is written
16  * and read exactly once in a perfectly sequential manner. Therefore,
17  * a datum once read will not be required again, and we can recycle its
18  * space for use by the new tape dataset(s) being generated. In this way,
19  * the total space usage is essentially just the actual data volume, plus
20  * insignificant bookkeeping and start/stop overhead.
21  *
22  * Few OSes allow arbitrary parts of a file to be released back to the OS,
23  * so we have to implement this space-recycling ourselves within a single
24  * logical file. logtape.c exists to perform this bookkeeping and provide
25  * the illusion of N independent tape devices to tuplesort.c. Note that
26  * logtape.c itself depends on buffile.c to provide a "logical file" of
27  * larger size than the underlying OS may support.
28  *
29  * For simplicity, we allocate and release space in the underlying file
30  * in BLCKSZ-size blocks. Space allocation boils down to keeping track
31  * of which blocks in the underlying file belong to which logical tape,
32  * plus any blocks that are free (recycled and not yet reused).
33  * The blocks in each logical tape form a chain, with a prev- and next-
34  * pointer in each block.
35  *
36  * The initial write pass is guaranteed to fill the underlying file
37  * perfectly sequentially, no matter how data is divided into logical tapes.
38  * Once we begin merge passes, the access pattern becomes considerably
39  * less predictable --- but the seeking involved should be comparable to
40  * what would happen if we kept each logical tape in a separate file,
41  * so there's no serious performance penalty paid to obtain the space
42  * savings of recycling. We try to localize the write accesses by always
43  * writing to the lowest-numbered free block when we have a choice; it's
44  * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
45  * policy for free blocks would be better?)
46  *
47  * To further make the I/Os more sequential, we can use a larger buffer
48  * when reading, and read multiple blocks from the same tape in one go,
49  * whenever the buffer becomes empty.
50  *
51  * To support the above policy of writing to the lowest free block, the
52  * freelist is a min heap.
53  *
54  * Since all the bookkeeping and buffer memory is allocated with palloc(),
55  * and the underlying file(s) are made with OpenTemporaryFile, all resources
56  * for a logical tape set are certain to be cleaned up even if processing
57  * is aborted by ereport(ERROR). To avoid confusion, the caller should take
58  * care that all calls for a single LogicalTapeSet are made in the same
59  * palloc context.
60  *
61  * To support parallel sort operations involving coordinated callers to
62  * tuplesort.c routines across multiple workers, it is necessary to
63  * concatenate each worker BufFile/tapeset into one single logical tapeset
64  * managed by the leader. Workers should have produced one final
65  * materialized tape (their entire output) when this happens in leader.
66  * There will always be the same number of runs as input tapes, and the same
67  * number of input tapes as participants (worker Tuplesortstates).
68  *
69  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
70  * Portions Copyright (c) 1994, Regents of the University of California
71  *
72  * IDENTIFICATION
73  * src/backend/utils/sort/logtape.c
74  *
75  *-------------------------------------------------------------------------
76  */
77 
78 #include "postgres.h"
79 
80 #include <fcntl.h>
81 
82 #include "storage/buffile.h"
83 #include "utils/builtins.h"
84 #include "utils/logtape.h"
85 #include "utils/memdebug.h"
86 #include "utils/memutils.h"
87 
88 /*
89  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
90  *
91  * The first block of a tape has prev == -1. The last block of a tape
92  * stores the number of valid bytes on the block, inverted, in 'next'
93  * Therefore next < 0 indicates the last block.
94  */
95 typedef struct TapeBlockTrailer
96 {
97  int64 prev; /* previous block on this tape, or -1 on first
98  * block */
99  int64 next; /* next block on this tape, or # of valid
100  * bytes on last block (if < 0) */
102 
103 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
104 #define TapeBlockGetTrailer(buf) \
105  ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
106 
107 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
108 #define TapeBlockGetNBytes(buf) \
109  (TapeBlockIsLast(buf) ? \
110  (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
111 #define TapeBlockSetNBytes(buf, nbytes) \
112  (TapeBlockGetTrailer(buf)->next = -(nbytes))
113 
114 /*
115  * When multiple tapes are being written to concurrently (as in HashAgg),
116  * avoid excessive fragmentation by preallocating block numbers to individual
117  * tapes. Each preallocation doubles in size starting at
118  * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
119  *
120  * No filesystem operations are performed for preallocation; only the block
121  * numbers are reserved. This may lead to sparse writes, which will cause
122  * ltsWriteBlock() to fill in holes with zeros.
123  */
124 #define TAPE_WRITE_PREALLOC_MIN 8
125 #define TAPE_WRITE_PREALLOC_MAX 128
126 
127 /*
128  * This data structure represents a single "logical tape" within the set
129  * of logical tapes stored in the same file.
130  *
131  * While writing, we hold the current partially-written data block in the
132  * buffer. While reading, we can hold multiple blocks in the buffer. Note
133  * that we don't retain the trailers of a block when it's read into the
134  * buffer. The buffer therefore contains one large contiguous chunk of data
135  * from the tape.
136  */
138 {
139  LogicalTapeSet *tapeSet; /* tape set this tape is part of */
140 
141  bool writing; /* T while in write phase */
142  bool frozen; /* T if blocks should not be freed when read */
143  bool dirty; /* does buffer need to be written? */
144 
145  /*
146  * Block numbers of the first, current, and next block of the tape.
147  *
148  * The "current" block number is only valid when writing, or reading from
149  * a frozen tape. (When reading from an unfrozen tape, we use a larger
150  * read buffer that holds multiple blocks, so the "current" block is
151  * ambiguous.)
152  *
153  * When concatenation of worker tape BufFiles is performed, an offset to
154  * the first block in the unified BufFile space is applied during reads.
155  */
160 
161  /*
162  * Buffer for current data block(s).
163  */
164  char *buffer; /* physical buffer (separately palloc'd) */
165  int buffer_size; /* allocated size of the buffer */
166  int max_size; /* highest useful, safe buffer_size */
167  int pos; /* next read/write position in buffer */
168  int nbytes; /* total # of valid bytes in buffer */
169 
170  /*
171  * Preallocated block numbers are held in an array sorted in descending
172  * order; blocks are consumed from the end of the array (lowest block
173  * numbers first).
174  */
175  int64 *prealloc;
176  int nprealloc; /* number of elements in list */
177  int prealloc_size; /* number of elements list can hold */
178 };
179 
180 /*
181  * This data structure represents a set of related "logical tapes" sharing
182  * space in a single underlying file. (But that "file" may be multiple files
183  * if needed to escape OS limits on file size; buffile.c handles that for us.)
184  * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
185  * demand.
186  */
188 {
189  BufFile *pfile; /* underlying file for whole tape set */
191  int worker; /* worker # if shared, -1 for leader/serial */
192 
193  /*
194  * File size tracking. nBlocksWritten is the size of the underlying file,
195  * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
196  * by ltsReleaseBlock(), and it is always greater than or equal to
197  * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
198  * blocks that have been allocated for a tape, but have not been written
199  * to the underlying file yet. nHoleBlocks tracks the total number of
200  * blocks that are in unused holes between worker spaces following BufFile
201  * concatenation.
202  */
203  int64 nBlocksAllocated; /* # of blocks allocated */
204  int64 nBlocksWritten; /* # of blocks used in underlying file */
205  int64 nHoleBlocks; /* # of "hole" blocks left */
206 
207  /*
208  * We store the numbers of recycled-and-available blocks in freeBlocks[].
209  * When there are no such blocks, we extend the underlying file.
210  *
211  * If forgetFreeSpace is true then any freed blocks are simply forgotten
212  * rather than being remembered in freeBlocks[]. See notes for
213  * LogicalTapeSetForgetFreeSpace().
214  */
215  bool forgetFreeSpace; /* are we remembering free blocks? */
216  int64 *freeBlocks; /* resizable array holding minheap */
217  int64 nFreeBlocks; /* # of currently free blocks */
218  Size freeBlocksLen; /* current allocated length of freeBlocks[] */
219  bool enable_prealloc; /* preallocate write blocks? */
220 };
221 
223 static void ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer);
224 static void ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer);
225 static int64 ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
226 static int64 ltsGetFreeBlock(LogicalTapeSet *lts);
227 static int64 ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
228 static void ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum);
229 static void ltsInitReadBuffer(LogicalTape *lt);
230 
231 
232 /*
233  * Write a block-sized buffer to the specified block of the underlying file.
234  *
235  * No need for an error return convention; we ereport() on any error.
236  */
237 static void
238 ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer)
239 {
240  /*
241  * BufFile does not support "holes", so if we're about to write a block
242  * that's past the current end of file, fill the space between the current
243  * end of file and the target block with zeros.
244  *
245  * This can happen either when tapes preallocate blocks; or for the last
246  * block of a tape which might not have been flushed.
247  *
248  * Note that BufFile concatenation can leave "holes" in BufFile between
249  * worker-owned block ranges. These are tracked for reporting purposes
250  * only. We never read from nor write to these hole blocks, and so they
251  * are not considered here.
252  */
253  while (blocknum > lts->nBlocksWritten)
254  {
255  PGIOAlignedBlock zerobuf;
256 
257  MemSet(zerobuf.data, 0, sizeof(zerobuf));
258 
259  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
260  }
261 
262  /* Write the requested block */
263  if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
264  ereport(ERROR,
266  errmsg("could not seek to block %lld of temporary file",
267  (long long) blocknum)));
268  BufFileWrite(lts->pfile, buffer, BLCKSZ);
269 
270  /* Update nBlocksWritten, if we extended the file */
271  if (blocknum == lts->nBlocksWritten)
272  lts->nBlocksWritten++;
273 }
274 
275 /*
276  * Read a block-sized buffer from the specified block of the underlying file.
277  *
278  * No need for an error return convention; we ereport() on any error. This
279  * module should never attempt to read a block it doesn't know is there.
280  */
281 static void
282 ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer)
283 {
284  if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
285  ereport(ERROR,
287  errmsg("could not seek to block %lld of temporary file",
288  (long long) blocknum)));
289  BufFileReadExact(lts->pfile, buffer, BLCKSZ);
290 }
291 
292 /*
293  * Read as many blocks as we can into the per-tape buffer.
294  *
295  * Returns true if anything was read, 'false' on EOF.
296  */
297 static bool
299 {
300  lt->pos = 0;
301  lt->nbytes = 0;
302 
303  do
304  {
305  char *thisbuf = lt->buffer + lt->nbytes;
306  int64 datablocknum = lt->nextBlockNumber;
307 
308  /* Fetch next block number */
309  if (datablocknum == -1L)
310  break; /* EOF */
311  /* Apply worker offset, needed for leader tapesets */
312  datablocknum += lt->offsetBlockNumber;
313 
314  /* Read the block */
315  ltsReadBlock(lt->tapeSet, datablocknum, thisbuf);
316  if (!lt->frozen)
317  ltsReleaseBlock(lt->tapeSet, datablocknum);
319 
320  lt->nbytes += TapeBlockGetNBytes(thisbuf);
321  if (TapeBlockIsLast(thisbuf))
322  {
323  lt->nextBlockNumber = -1L;
324  /* EOF */
325  break;
326  }
327  else
328  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
329 
330  /* Advance to next block, if we have buffer space left */
331  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
332 
333  return (lt->nbytes > 0);
334 }
335 
336 static inline uint64
337 left_offset(uint64 i)
338 {
339  return 2 * i + 1;
340 }
341 
342 static inline uint64
344 {
345  return 2 * i + 2;
346 }
347 
348 static inline uint64
350 {
351  return (i - 1) / 2;
352 }
353 
354 /*
355  * Get the next block for writing.
356  */
357 static int64
359 {
360  if (lts->enable_prealloc)
361  return ltsGetPreallocBlock(lts, lt);
362  else
363  return ltsGetFreeBlock(lts);
364 }
365 
366 /*
367  * Select the lowest currently unused block from the tape set's global free
368  * list min heap.
369  */
370 static int64
372 {
373  int64 *heap = lts->freeBlocks;
374  int64 blocknum;
375  int64 heapsize;
376  int64 holeval;
377  uint64 holepos;
378 
379  /* freelist empty; allocate a new block */
380  if (lts->nFreeBlocks == 0)
381  return lts->nBlocksAllocated++;
382 
383  /* easy if heap contains one element */
384  if (lts->nFreeBlocks == 1)
385  {
386  lts->nFreeBlocks--;
387  return lts->freeBlocks[0];
388  }
389 
390  /* remove top of minheap */
391  blocknum = heap[0];
392 
393  /* we'll replace it with end of minheap array */
394  holeval = heap[--lts->nFreeBlocks];
395 
396  /* sift down */
397  holepos = 0; /* holepos is where the "hole" is */
398  heapsize = lts->nFreeBlocks;
399  while (true)
400  {
401  uint64 left = left_offset(holepos);
402  uint64 right = right_offset(holepos);
403  uint64 min_child;
404 
405  if (left < heapsize && right < heapsize)
406  min_child = (heap[left] < heap[right]) ? left : right;
407  else if (left < heapsize)
408  min_child = left;
409  else if (right < heapsize)
410  min_child = right;
411  else
412  break;
413 
414  if (heap[min_child] >= holeval)
415  break;
416 
417  heap[holepos] = heap[min_child];
418  holepos = min_child;
419  }
420  heap[holepos] = holeval;
421 
422  return blocknum;
423 }
424 
425 /*
426  * Return the lowest free block number from the tape's preallocation list.
427  * Refill the preallocation list with blocks from the tape set's free list if
428  * necessary.
429  */
430 static int64
432 {
433  /* sorted in descending order, so return the last element */
434  if (lt->nprealloc > 0)
435  return lt->prealloc[--lt->nprealloc];
436 
437  if (lt->prealloc == NULL)
438  {
440  lt->prealloc = (int64 *) palloc(sizeof(int64) * lt->prealloc_size);
441  }
442  else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
443  {
444  /* when the preallocation list runs out, double the size */
445  lt->prealloc_size *= 2;
448  lt->prealloc = (int64 *) repalloc(lt->prealloc,
449  sizeof(int64) * lt->prealloc_size);
450  }
451 
452  /* refill preallocation list */
453  lt->nprealloc = lt->prealloc_size;
454  for (int i = lt->nprealloc; i > 0; i--)
455  {
456  lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
457 
458  /* verify descending order */
459  Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
460  }
461 
462  return lt->prealloc[--lt->nprealloc];
463 }
464 
465 /*
466  * Return a block# to the freelist.
467  */
468 static void
469 ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum)
470 {
471  int64 *heap;
472  uint64 holepos;
473 
474  /*
475  * Do nothing if we're no longer interested in remembering free space.
476  */
477  if (lts->forgetFreeSpace)
478  return;
479 
480  /*
481  * Enlarge freeBlocks array if full.
482  */
483  if (lts->nFreeBlocks >= lts->freeBlocksLen)
484  {
485  /*
486  * If the freelist becomes very large, just return and leak this free
487  * block.
488  */
489  if (lts->freeBlocksLen * 2 * sizeof(int64) > MaxAllocSize)
490  return;
491 
492  lts->freeBlocksLen *= 2;
493  lts->freeBlocks = (int64 *) repalloc(lts->freeBlocks,
494  lts->freeBlocksLen * sizeof(int64));
495  }
496 
497  /* create a "hole" at end of minheap array */
498  heap = lts->freeBlocks;
499  holepos = lts->nFreeBlocks;
500  lts->nFreeBlocks++;
501 
502  /* sift up to insert blocknum */
503  while (holepos != 0)
504  {
505  uint64 parent = parent_offset(holepos);
506 
507  if (heap[parent] < blocknum)
508  break;
509 
510  heap[holepos] = heap[parent];
511  holepos = parent;
512  }
513  heap[holepos] = blocknum;
514 }
515 
516 /*
517  * Lazily allocate and initialize the read buffer. This avoids waste when many
518  * tapes are open at once, but not all are active between rewinding and
519  * reading.
520  */
521 static void
523 {
524  Assert(lt->buffer_size > 0);
525  lt->buffer = palloc(lt->buffer_size);
526 
527  /* Read the first block, or reset if tape is empty */
529  lt->pos = 0;
530  lt->nbytes = 0;
531  ltsReadFillBuffer(lt);
532 }
533 
534 /*
535  * Create a tape set, backed by a temporary underlying file.
536  *
537  * The tape set is initially empty. Use LogicalTapeCreate() to create
538  * tapes in it.
539  *
540  * In a single-process sort, pass NULL argument for fileset, and -1 for
541  * worker.
542  *
543  * In a parallel sort, parallel workers pass the shared fileset handle and
544  * their own worker number. After the workers have finished, create the
545  * tape set in the leader, passing the shared fileset handle and -1 for
546  * worker, and use LogicalTapeImport() to import the worker tapes into it.
547  *
548  * Currently, the leader will only import worker tapes into the set, it does
549  * not create tapes of its own, although in principle that should work.
550  *
551  * If preallocate is true, blocks for each individual tape are allocated in
552  * batches. This avoids fragmentation when writing multiple tapes at the
553  * same time.
554  */
556 LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
557 {
558  LogicalTapeSet *lts;
559 
560  /*
561  * Create top-level struct including per-tape LogicalTape structs.
562  */
563  lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
564  lts->nBlocksAllocated = 0L;
565  lts->nBlocksWritten = 0L;
566  lts->nHoleBlocks = 0L;
567  lts->forgetFreeSpace = false;
568  lts->freeBlocksLen = 32; /* reasonable initial guess */
569  lts->freeBlocks = (int64 *) palloc(lts->freeBlocksLen * sizeof(int64));
570  lts->nFreeBlocks = 0;
571  lts->enable_prealloc = preallocate;
572 
573  lts->fileset = fileset;
574  lts->worker = worker;
575 
576  /*
577  * Create temp BufFile storage as required.
578  *
579  * In leader, we hijack the BufFile of the first tape that's imported, and
580  * concatenate the BufFiles of any subsequent tapes to that. Hence don't
581  * create a BufFile here. Things are simpler for the worker case and the
582  * serial case, though. They are generally very similar -- workers use a
583  * shared fileset, whereas serial sorts use a conventional serial BufFile.
584  */
585  if (fileset && worker == -1)
586  lts->pfile = NULL;
587  else if (fileset)
588  {
589  char filename[MAXPGPATH];
590 
591  pg_itoa(worker, filename);
592  lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
593  }
594  else
595  lts->pfile = BufFileCreateTemp(false);
596 
597  return lts;
598 }
599 
600 /*
601  * Claim ownership of a logical tape from an existing shared BufFile.
602  *
603  * Caller should be leader process. Though tapes are marked as frozen in
604  * workers, they are not frozen when opened within leader, since unfrozen tapes
605  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
606  * for random access.)
607  */
608 LogicalTape *
609 LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
610 {
611  LogicalTape *lt;
612  int64 tapeblocks;
613  char filename[MAXPGPATH];
614  BufFile *file;
615  int64 filesize;
616 
617  lt = ltsCreateTape(lts);
618 
619  /*
620  * build concatenated view of all buffiles, remembering the block number
621  * where each source file begins.
622  */
623  pg_itoa(worker, filename);
624  file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
625  filesize = BufFileSize(file);
626 
627  /*
628  * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
629  * block offset into each tape as we go.
630  */
631  lt->firstBlockNumber = shared->firstblocknumber;
632  if (lts->pfile == NULL)
633  {
634  lts->pfile = file;
635  lt->offsetBlockNumber = 0L;
636  }
637  else
638  {
639  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
640  }
641  /* Don't allocate more for read buffer than could possibly help */
642  lt->max_size = Min(MaxAllocSize, filesize);
643  tapeblocks = filesize / BLCKSZ;
644 
645  /*
646  * Update # of allocated blocks and # blocks written to reflect the
647  * imported BufFile. Allocated/written blocks include space used by holes
648  * left between concatenated BufFiles. Also track the number of hole
649  * blocks so that we can later work backwards to calculate the number of
650  * physical blocks for instrumentation.
651  */
653 
654  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
655  lts->nBlocksWritten = lts->nBlocksAllocated;
656 
657  return lt;
658 }
659 
660 /*
661  * Close a logical tape set and release all resources.
662  *
663  * NOTE: This doesn't close any of the tapes! You must close them
664  * first, or you can let them be destroyed along with the memory context.
665  */
666 void
668 {
669  BufFileClose(lts->pfile);
670  pfree(lts->freeBlocks);
671  pfree(lts);
672 }
673 
674 /*
675  * Create a logical tape in the given tapeset.
676  *
677  * The tape is initialized in write state.
678  */
679 LogicalTape *
681 {
682  /*
683  * The only thing that currently prevents creating new tapes in leader is
684  * the fact that BufFiles opened using BufFileOpenShared() are read-only
685  * by definition, but that could be changed if it seemed worthwhile. For
686  * now, writing to the leader tape will raise a "Bad file descriptor"
687  * error, so tuplesort must avoid writing to the leader tape altogether.
688  */
689  if (lts->fileset && lts->worker == -1)
690  elog(ERROR, "cannot create new tapes in leader process");
691 
692  return ltsCreateTape(lts);
693 }
694 
695 static LogicalTape *
697 {
698  LogicalTape *lt;
699 
700  /*
701  * Create per-tape struct. Note we allocate the I/O buffer lazily.
702  */
703  lt = palloc(sizeof(LogicalTape));
704  lt->tapeSet = lts;
705  lt->writing = true;
706  lt->frozen = false;
707  lt->dirty = false;
708  lt->firstBlockNumber = -1L;
709  lt->curBlockNumber = -1L;
710  lt->nextBlockNumber = -1L;
711  lt->offsetBlockNumber = 0L;
712  lt->buffer = NULL;
713  lt->buffer_size = 0;
714  /* palloc() larger than MaxAllocSize would fail */
715  lt->max_size = MaxAllocSize;
716  lt->pos = 0;
717  lt->nbytes = 0;
718  lt->prealloc = NULL;
719  lt->nprealloc = 0;
720  lt->prealloc_size = 0;
721 
722  return lt;
723 }
724 
725 /*
726  * Close a logical tape.
727  *
728  * Note: This doesn't return any blocks to the free list! You must read
729  * the tape to the end first, to reuse the space. In current use, though,
730  * we only close tapes after fully reading them.
731  */
732 void
734 {
735  if (lt->buffer)
736  pfree(lt->buffer);
737  pfree(lt);
738 }
739 
740 /*
741  * Mark a logical tape set as not needing management of free space anymore.
742  *
743  * This should be called if the caller does not intend to write any more data
744  * into the tape set, but is reading from un-frozen tapes. Since no more
745  * writes are planned, remembering free blocks is no longer useful. Setting
746  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
747  * is not designed to handle large numbers of free blocks.
748  */
749 void
751 {
752  lts->forgetFreeSpace = true;
753 }
754 
755 /*
756  * Write to a logical tape.
757  *
758  * There are no error returns; we ereport() on failure.
759  */
760 void
761 LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
762 {
763  LogicalTapeSet *lts = lt->tapeSet;
764  size_t nthistime;
765 
766  Assert(lt->writing);
767  Assert(lt->offsetBlockNumber == 0L);
768 
769  /* Allocate data buffer and first block on first write */
770  if (lt->buffer == NULL)
771  {
772  lt->buffer = (char *) palloc(BLCKSZ);
773  lt->buffer_size = BLCKSZ;
774  }
775  if (lt->curBlockNumber == -1)
776  {
777  Assert(lt->firstBlockNumber == -1);
778  Assert(lt->pos == 0);
779 
780  lt->curBlockNumber = ltsGetBlock(lts, lt);
782 
783  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
784  }
785 
786  Assert(lt->buffer_size == BLCKSZ);
787  while (size > 0)
788  {
789  if (lt->pos >= (int) TapeBlockPayloadSize)
790  {
791  /* Buffer full, dump it out */
792  int64 nextBlockNumber;
793 
794  if (!lt->dirty)
795  {
796  /* Hmm, went directly from reading to writing? */
797  elog(ERROR, "invalid logtape state: should be dirty");
798  }
799 
800  /*
801  * First allocate the next block, so that we can store it in the
802  * 'next' pointer of this block.
803  */
804  nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
805 
806  /* set the next-pointer and dump the current block. */
807  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
809 
810  /* initialize the prev-pointer of the next block */
811  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
812  lt->curBlockNumber = nextBlockNumber;
813  lt->pos = 0;
814  lt->nbytes = 0;
815  }
816 
817  nthistime = TapeBlockPayloadSize - lt->pos;
818  if (nthistime > size)
819  nthistime = size;
820  Assert(nthistime > 0);
821 
822  memcpy(lt->buffer + lt->pos, ptr, nthistime);
823 
824  lt->dirty = true;
825  lt->pos += nthistime;
826  if (lt->nbytes < lt->pos)
827  lt->nbytes = lt->pos;
828  ptr = (const char *) ptr + nthistime;
829  size -= nthistime;
830  }
831 }
832 
833 /*
834  * Rewind logical tape and switch from writing to reading.
835  *
836  * The tape must currently be in writing state, or "frozen" in read state.
837  *
838  * 'buffer_size' specifies how much memory to use for the read buffer.
839  * Regardless of the argument, the actual amount of memory used is between
840  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
841  * rounded down and truncated to fit those constraints, if necessary. If the
842  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
843  * byte buffer is used.
844  */
845 void
846 LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
847 {
848  LogicalTapeSet *lts = lt->tapeSet;
849 
850  /*
851  * Round and cap buffer_size if needed.
852  */
853  if (lt->frozen)
854  buffer_size = BLCKSZ;
855  else
856  {
857  /* need at least one block */
858  if (buffer_size < BLCKSZ)
859  buffer_size = BLCKSZ;
860 
861  /* palloc() larger than max_size is unlikely to be helpful */
862  if (buffer_size > lt->max_size)
863  buffer_size = lt->max_size;
864 
865  /* round down to BLCKSZ boundary */
866  buffer_size -= buffer_size % BLCKSZ;
867  }
868 
869  if (lt->writing)
870  {
871  /*
872  * Completion of a write phase. Flush last partial data block, and
873  * rewind for normal (destructive) read.
874  */
875  if (lt->dirty)
876  {
877  /*
878  * As long as we've filled the buffer at least once, its contents
879  * are entirely defined from valgrind's point of view, even though
880  * contents beyond the current end point may be stale. But it's
881  * possible - at least in the case of a parallel sort - to sort
882  * such small amount of data that we do not fill the buffer even
883  * once. Tell valgrind that its contents are defined, so it
884  * doesn't bleat.
885  */
887  lt->buffer_size - lt->nbytes);
888 
889  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
891  }
892  lt->writing = false;
893  }
894  else
895  {
896  /*
897  * This is only OK if tape is frozen; we rewind for (another) read
898  * pass.
899  */
900  Assert(lt->frozen);
901  }
902 
903  if (lt->buffer)
904  pfree(lt->buffer);
905 
906  /* the buffer is lazily allocated, but set the size here */
907  lt->buffer = NULL;
908  lt->buffer_size = buffer_size;
909 
910  /* free the preallocation list, and return unused block numbers */
911  if (lt->prealloc != NULL)
912  {
913  for (int i = lt->nprealloc; i > 0; i--)
914  ltsReleaseBlock(lts, lt->prealloc[i - 1]);
915  pfree(lt->prealloc);
916  lt->prealloc = NULL;
917  lt->nprealloc = 0;
918  lt->prealloc_size = 0;
919  }
920 }
921 
922 /*
923  * Read from a logical tape.
924  *
925  * Early EOF is indicated by return value less than #bytes requested.
926  */
927 size_t
928 LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
929 {
930  size_t nread = 0;
931  size_t nthistime;
932 
933  Assert(!lt->writing);
934 
935  if (lt->buffer == NULL)
936  ltsInitReadBuffer(lt);
937 
938  while (size > 0)
939  {
940  if (lt->pos >= lt->nbytes)
941  {
942  /* Try to load more data into buffer. */
943  if (!ltsReadFillBuffer(lt))
944  break; /* EOF */
945  }
946 
947  nthistime = lt->nbytes - lt->pos;
948  if (nthistime > size)
949  nthistime = size;
950  Assert(nthistime > 0);
951 
952  memcpy(ptr, lt->buffer + lt->pos, nthistime);
953 
954  lt->pos += nthistime;
955  ptr = (char *) ptr + nthistime;
956  size -= nthistime;
957  nread += nthistime;
958  }
959 
960  return nread;
961 }
962 
963 /*
964  * "Freeze" the contents of a tape so that it can be read multiple times
965  * and/or read backwards. Once a tape is frozen, its contents will not
966  * be released until the LogicalTapeSet is destroyed. This is expected
967  * to be used only for the final output pass of a merge.
968  *
969  * This *must* be called just at the end of a write pass, before the
970  * tape is rewound (after rewind is too late!). It performs a rewind
971  * and switch to read mode "for free". An immediately following rewind-
972  * for-read call is OK but not necessary.
973  *
974  * share output argument is set with details of storage used for tape after
975  * freezing, which may be passed to LogicalTapeSetCreate within leader
976  * process later. This metadata is only of interest to worker callers
977  * freezing their final output for leader (single materialized tape).
978  * Serial sorts should set share to NULL.
979  */
980 void
982 {
983  LogicalTapeSet *lts = lt->tapeSet;
984 
985  Assert(lt->writing);
986  Assert(lt->offsetBlockNumber == 0L);
987 
988  /*
989  * Completion of a write phase. Flush last partial data block, and rewind
990  * for nondestructive read.
991  */
992  if (lt->dirty)
993  {
994  /*
995  * As long as we've filled the buffer at least once, its contents are
996  * entirely defined from valgrind's point of view, even though
997  * contents beyond the current end point may be stale. But it's
998  * possible - at least in the case of a parallel sort - to sort such
999  * small amount of data that we do not fill the buffer even once. Tell
1000  * valgrind that its contents are defined, so it doesn't bleat.
1001  */
1003  lt->buffer_size - lt->nbytes);
1004 
1005  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
1006  ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
1007  }
1008  lt->writing = false;
1009  lt->frozen = true;
1010 
1011  /*
1012  * The seek and backspace functions assume a single block read buffer.
1013  * That's OK with current usage. A larger buffer is helpful to make the
1014  * read pattern of the backing file look more sequential to the OS, when
1015  * we're reading from multiple tapes. But at the end of a sort, when a
1016  * tape is frozen, we only read from a single tape anyway.
1017  */
1018  if (!lt->buffer || lt->buffer_size != BLCKSZ)
1019  {
1020  if (lt->buffer)
1021  pfree(lt->buffer);
1022  lt->buffer = palloc(BLCKSZ);
1023  lt->buffer_size = BLCKSZ;
1024  }
1025 
1026  /* Read the first block, or reset if tape is empty */
1027  lt->curBlockNumber = lt->firstBlockNumber;
1028  lt->pos = 0;
1029  lt->nbytes = 0;
1030 
1031  if (lt->firstBlockNumber == -1L)
1032  lt->nextBlockNumber = -1L;
1033  ltsReadBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
1034  if (TapeBlockIsLast(lt->buffer))
1035  lt->nextBlockNumber = -1L;
1036  else
1037  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1038  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
1039 
1040  /* Handle extra steps when caller is to share its tapeset */
1041  if (share)
1042  {
1044  share->firstblocknumber = lt->firstBlockNumber;
1045  }
1046 }
1047 
1048 /*
1049  * Backspace the tape a given number of bytes. (We also support a more
1050  * general seek interface, see below.)
1051  *
1052  * *Only* a frozen-for-read tape can be backed up; we don't support
1053  * random access during write, and an unfrozen read tape may have
1054  * already discarded the desired data!
1055  *
1056  * Returns the number of bytes backed up. It can be less than the
1057  * requested amount, if there isn't that much data before the current
1058  * position. The tape is positioned to the beginning of the tape in
1059  * that case.
1060  */
1061 size_t
1063 {
1064  size_t seekpos = 0;
1065 
1066  Assert(lt->frozen);
1067  Assert(lt->buffer_size == BLCKSZ);
1068 
1069  if (lt->buffer == NULL)
1070  ltsInitReadBuffer(lt);
1071 
1072  /*
1073  * Easy case for seek within current block.
1074  */
1075  if (size <= (size_t) lt->pos)
1076  {
1077  lt->pos -= (int) size;
1078  return size;
1079  }
1080 
1081  /*
1082  * Not-so-easy case, have to walk back the chain of blocks. This
1083  * implementation would be pretty inefficient for long seeks, but we
1084  * really aren't doing that (a seek over one tuple is typical).
1085  */
1086  seekpos = (size_t) lt->pos; /* part within this block */
1087  while (size > seekpos)
1088  {
1089  int64 prev = TapeBlockGetTrailer(lt->buffer)->prev;
1090 
1091  if (prev == -1L)
1092  {
1093  /* Tried to back up beyond the beginning of tape. */
1094  if (lt->curBlockNumber != lt->firstBlockNumber)
1095  elog(ERROR, "unexpected end of tape");
1096  lt->pos = 0;
1097  return seekpos;
1098  }
1099 
1100  ltsReadBlock(lt->tapeSet, prev, lt->buffer);
1101 
1102  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1103  elog(ERROR, "broken tape, next of block %lld is %lld, expected %lld",
1104  (long long) prev,
1105  (long long) (TapeBlockGetTrailer(lt->buffer)->next),
1106  (long long) lt->curBlockNumber);
1107 
1109  lt->curBlockNumber = prev;
1110  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1111 
1112  seekpos += TapeBlockPayloadSize;
1113  }
1114 
1115  /*
1116  * 'seekpos' can now be greater than 'size', because it points to the
1117  * beginning the target block. The difference is the position within the
1118  * page.
1119  */
1120  lt->pos = seekpos - size;
1121  return size;
1122 }
1123 
1124 /*
1125  * Seek to an arbitrary position in a logical tape.
1126  *
1127  * *Only* a frozen-for-read tape can be seeked.
1128  *
1129  * Must be called with a block/offset previously returned by
1130  * LogicalTapeTell().
1131  */
1132 void
1133 LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
1134 {
1135  Assert(lt->frozen);
1136  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1137  Assert(lt->buffer_size == BLCKSZ);
1138 
1139  if (lt->buffer == NULL)
1140  ltsInitReadBuffer(lt);
1141 
1142  if (blocknum != lt->curBlockNumber)
1143  {
1144  ltsReadBlock(lt->tapeSet, blocknum, lt->buffer);
1145  lt->curBlockNumber = blocknum;
1147  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1148  }
1149 
1150  if (offset > lt->nbytes)
1151  elog(ERROR, "invalid tape seek position");
1152  lt->pos = offset;
1153 }
1154 
1155 /*
1156  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1157  *
1158  * NOTE: it'd be OK to do this during write phase with intention of using
1159  * the position for a seek after freezing. Not clear if anyone needs that.
1160  */
1161 void
1162 LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
1163 {
1164  if (lt->buffer == NULL)
1165  ltsInitReadBuffer(lt);
1166 
1167  Assert(lt->offsetBlockNumber == 0L);
1168 
1169  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1170  Assert(lt->buffer_size == BLCKSZ);
1171 
1172  *blocknum = lt->curBlockNumber;
1173  *offset = lt->pos;
1174 }
1175 
1176 /*
1177  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1178  *
1179  * This should not be called while there are open write buffers; otherwise it
1180  * may not account for buffered data.
1181  */
1182 int64
1184 {
1185  return lts->nBlocksWritten - lts->nHoleBlocks;
1186 }
int BufFileSeekBlock(BufFile *file, int64 blknum)
Definition: buffile.c:851
void BufFileExportFileSet(BufFile *file)
Definition: buffile.c:394
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:193
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
int64 BufFileSize(BufFile *file)
Definition: buffile.c:866
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
void BufFileClose(BufFile *file)
Definition: buffile.c:412
int64 BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:905
#define Min(x, y)
Definition: c.h:991
#define MemSet(start, val, len)
Definition: c.h:1007
size_t Size
Definition: c.h:592
int errcode_for_file_access(void)
Definition: elog.c:882
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
LogicalTape * LogicalTapeCreate(LogicalTapeSet *lts)
Definition: logtape.c:680
void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
Definition: logtape.c:846
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:104
static LogicalTape * ltsCreateTape(LogicalTapeSet *lts)
Definition: logtape.c:696
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:750
size_t LogicalTapeBackspace(LogicalTape *lt, size_t size)
Definition: logtape.c:1062
static int64 ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:431
size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
Definition: logtape.c:928
static uint64 parent_offset(uint64 i)
Definition: logtape.c:349
int64 LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1183
void LogicalTapeClose(LogicalTape *lt)
Definition: logtape.c:733
#define TapeBlockPayloadSize
Definition: logtape.c:103
struct TapeBlockTrailer TapeBlockTrailer
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:667
static bool ltsReadFillBuffer(LogicalTape *lt)
Definition: logtape.c:298
static void ltsInitReadBuffer(LogicalTape *lt)
Definition: logtape.c:522
#define TAPE_WRITE_PREALLOC_MIN
Definition: logtape.c:124
static uint64 left_offset(uint64 i)
Definition: logtape.c:337
static void ltsWriteBlock(LogicalTapeSet *lts, int64 blocknum, const void *buffer)
Definition: logtape.c:238
static int64 ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:358
#define TapeBlockIsLast(buf)
Definition: logtape.c:107
#define TAPE_WRITE_PREALLOC_MAX
Definition: logtape.c:125
void LogicalTapeSeek(LogicalTape *lt, int64 blocknum, int offset)
Definition: logtape.c:1133
static void ltsReadBlock(LogicalTapeSet *lts, int64 blocknum, void *buffer)
Definition: logtape.c:282
void LogicalTapeTell(LogicalTape *lt, int64 *blocknum, int *offset)
Definition: logtape.c:1162
void LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
Definition: logtape.c:761
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:108
static void ltsReleaseBlock(LogicalTapeSet *lts, int64 blocknum)
Definition: logtape.c:469
LogicalTapeSet * LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
Definition: logtape.c:556
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:111
static int64 ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:371
static uint64 right_offset(uint64 i)
Definition: logtape.c:343
void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
Definition: logtape.c:981
LogicalTape * LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
Definition: logtape.c:609
void pfree(void *pointer)
Definition: mcxt.c:1401
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1421
void * palloc(Size size)
Definition: mcxt.c:1197
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define MaxAllocSize
Definition: memutils.h:40
int pg_itoa(int16 i, char *a)
Definition: numutils.c:1044
#define MAXPGPATH
while(p+4<=pend)
static char * filename
Definition: pg_dumpall.c:121
static pg_noinline void Size size
Definition: slab.c:607
BufFile * pfile
Definition: logtape.c:189
bool forgetFreeSpace
Definition: logtape.c:215
SharedFileSet * fileset
Definition: logtape.c:190
int64 nHoleBlocks
Definition: logtape.c:205
int64 * freeBlocks
Definition: logtape.c:216
Size freeBlocksLen
Definition: logtape.c:218
int64 nFreeBlocks
Definition: logtape.c:217
bool enable_prealloc
Definition: logtape.c:219
int64 nBlocksAllocated
Definition: logtape.c:203
int64 nBlocksWritten
Definition: logtape.c:204
int64 curBlockNumber
Definition: logtape.c:157
int64 firstBlockNumber
Definition: logtape.c:156
int64 * prealloc
Definition: logtape.c:175
int max_size
Definition: logtape.c:166
int nprealloc
Definition: logtape.c:176
char * buffer
Definition: logtape.c:164
int64 nextBlockNumber
Definition: logtape.c:158
bool frozen
Definition: logtape.c:142
int prealloc_size
Definition: logtape.c:177
int64 offsetBlockNumber
Definition: logtape.c:159
int buffer_size
Definition: logtape.c:165
bool writing
Definition: logtape.c:141
LogicalTapeSet * tapeSet
Definition: logtape.c:139
int nbytes
Definition: logtape.c:168
bool dirty
Definition: logtape.c:143
int64 firstblocknumber
Definition: logtape.h:54
char data[BLCKSZ]
Definition: c.h:1124