PostgreSQL Source Code  git master
logtape.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  * Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted. (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass. For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner. Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated. In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file. logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c. Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape form a chain, with a prev- and next-
35  * pointer in each block.
36  *
37  * The initial write pass is guaranteed to fill the underlying file
38  * perfectly sequentially, no matter how data is divided into logical tapes.
39  * Once we begin merge passes, the access pattern becomes considerably
40  * less predictable --- but the seeking involved should be comparable to
41  * what would happen if we kept each logical tape in a separate file,
42  * so there's no serious performance penalty paid to obtain the space
43  * savings of recycling. We try to localize the write accesses by always
44  * writing to the lowest-numbered free block when we have a choice; it's
45  * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46  * policy for free blocks would be better?)
47  *
48  * To further make the I/Os more sequential, we can use a larger buffer
49  * when reading, and read multiple blocks from the same tape in one go,
50  * whenever the buffer becomes empty.
51  *
52  * To support the above policy of writing to the lowest free block, the
53  * freelist is a min heap.
54  *
55  * Since all the bookkeeping and buffer memory is allocated with palloc(),
56  * and the underlying file(s) are made with OpenTemporaryFile, all resources
57  * for a logical tape set are certain to be cleaned up even if processing
58  * is aborted by ereport(ERROR). To avoid confusion, the caller should take
59  * care that all calls for a single LogicalTapeSet are made in the same
60  * palloc context.
61  *
62  * To support parallel sort operations involving coordinated callers to
63  * tuplesort.c routines across multiple workers, it is necessary to
64  * concatenate each worker BufFile/tapeset into one single logical tapeset
65  * managed by the leader. Workers should have produced one final
66  * materialized tape (their entire output) when this happens in leader.
67  * There will always be the same number of runs as input tapes, and the same
68  * number of input tapes as participants (worker Tuplesortstates).
69  *
70  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
71  * Portions Copyright (c) 1994, Regents of the University of California
72  *
73  * IDENTIFICATION
74  * src/backend/utils/sort/logtape.c
75  *
76  *-------------------------------------------------------------------------
77  */
78 
79 #include "postgres.h"
80 
81 #include "storage/buffile.h"
82 #include "utils/builtins.h"
83 #include "utils/logtape.h"
84 #include "utils/memdebug.h"
85 #include "utils/memutils.h"
86 
87 /*
88  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
89  *
90  * The first block of a tape has prev == -1. The last block of a tape
91  * stores the number of valid bytes on the block, inverted, in 'next'
92  * Therefore next < 0 indicates the last block.
93  */
94 typedef struct TapeBlockTrailer
95 {
96  long prev; /* previous block on this tape, or -1 on first
97  * block */
98  long next; /* next block on this tape, or # of valid
99  * bytes on last block (if < 0) */
101 
102 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
103 #define TapeBlockGetTrailer(buf) \
104  ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
105 
106 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
107 #define TapeBlockGetNBytes(buf) \
108  (TapeBlockIsLast(buf) ? \
109  (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
110 #define TapeBlockSetNBytes(buf, nbytes) \
111  (TapeBlockGetTrailer(buf)->next = -(nbytes))
112 
113 
114 /*
115  * This data structure represents a single "logical tape" within the set
116  * of logical tapes stored in the same file.
117  *
118  * While writing, we hold the current partially-written data block in the
119  * buffer. While reading, we can hold multiple blocks in the buffer. Note
120  * that we don't retain the trailers of a block when it's read into the
121  * buffer. The buffer therefore contains one large contiguous chunk of data
122  * from the tape.
123  */
124 typedef struct LogicalTape
125 {
126  bool writing; /* T while in write phase */
127  bool frozen; /* T if blocks should not be freed when read */
128  bool dirty; /* does buffer need to be written? */
129 
130  /*
131  * Block numbers of the first, current, and next block of the tape.
132  *
133  * The "current" block number is only valid when writing, or reading from
134  * a frozen tape. (When reading from an unfrozen tape, we use a larger
135  * read buffer that holds multiple blocks, so the "current" block is
136  * ambiguous.)
137  *
138  * When concatenation of worker tape BufFiles is performed, an offset to
139  * the first block in the unified BufFile space is applied during reads.
140  */
145 
146  /*
147  * Buffer for current data block(s).
148  */
149  char *buffer; /* physical buffer (separately palloc'd) */
150  int buffer_size; /* allocated size of the buffer */
151  int max_size; /* highest useful, safe buffer_size */
152  int pos; /* next read/write position in buffer */
153  int nbytes; /* total # of valid bytes in buffer */
154 } LogicalTape;
155 
156 /*
157  * This data structure represents a set of related "logical tapes" sharing
158  * space in a single underlying file. (But that "file" may be multiple files
159  * if needed to escape OS limits on file size; buffile.c handles that for us.)
160  * The number of tapes is fixed at creation.
161  */
163 {
164  BufFile *pfile; /* underlying file for whole tape set */
165 
166  /*
167  * File size tracking. nBlocksWritten is the size of the underlying file,
168  * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
169  * by ltsReleaseBlock(), and it is always greater than or equal to
170  * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
171  * blocks that have been allocated for a tape, but have not been written
172  * to the underlying file yet. nHoleBlocks tracks the total number of
173  * blocks that are in unused holes between worker spaces following BufFile
174  * concatenation.
175  */
176  long nBlocksAllocated; /* # of blocks allocated */
177  long nBlocksWritten; /* # of blocks used in underlying file */
178  long nHoleBlocks; /* # of "hole" blocks left */
179 
180  /*
181  * We store the numbers of recycled-and-available blocks in freeBlocks[].
182  * When there are no such blocks, we extend the underlying file.
183  *
184  * If forgetFreeSpace is true then any freed blocks are simply forgotten
185  * rather than being remembered in freeBlocks[]. See notes for
186  * LogicalTapeSetForgetFreeSpace().
187  */
188  bool forgetFreeSpace; /* are we remembering free blocks? */
189  long *freeBlocks; /* resizable array holding minheap */
190  long nFreeBlocks; /* # of currently free blocks */
191  Size freeBlocksLen; /* current allocated length of freeBlocks[] */
192 
193  /* The array of logical tapes. */
194  int nTapes; /* # of logical tapes in set */
195  LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
196 };
197 
198 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
199 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
200 static long ltsGetFreeBlock(LogicalTapeSet *lts);
201 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
202 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
203  SharedFileSet *fileset);
204 static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
205 
206 
207 /*
208  * Write a block-sized buffer to the specified block of the underlying file.
209  *
210  * No need for an error return convention; we ereport() on any error.
211  */
212 static void
213 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
214 {
215  /*
216  * BufFile does not support "holes", so if we're about to write a block
217  * that's past the current end of file, fill the space between the current
218  * end of file and the target block with zeros.
219  *
220  * This should happen rarely, otherwise you are not writing very
221  * sequentially. In current use, this only happens when the sort ends
222  * writing a run, and switches to another tape. The last block of the
223  * previous tape isn't flushed to disk until the end of the sort, so you
224  * get one-block hole, where the last block of the previous tape will
225  * later go.
226  *
227  * Note that BufFile concatenation can leave "holes" in BufFile between
228  * worker-owned block ranges. These are tracked for reporting purposes
229  * only. We never read from nor write to these hole blocks, and so they
230  * are not considered here.
231  */
232  while (blocknum > lts->nBlocksWritten)
233  {
234  PGAlignedBlock zerobuf;
235 
236  MemSet(zerobuf.data, 0, sizeof(zerobuf));
237 
238  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
239  }
240 
241  /* Write the requested block */
242  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
243  BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
244  ereport(ERROR,
246  errmsg("could not write block %ld of temporary file: %m",
247  blocknum)));
248 
249  /* Update nBlocksWritten, if we extended the file */
250  if (blocknum == lts->nBlocksWritten)
251  lts->nBlocksWritten++;
252 }
253 
254 /*
255  * Read a block-sized buffer from the specified block of the underlying file.
256  *
257  * No need for an error return convention; we ereport() on any error. This
258  * module should never attempt to read a block it doesn't know is there.
259  */
260 static void
261 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
262 {
263  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
264  BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
265  ereport(ERROR,
267  errmsg("could not read block %ld of temporary file: %m",
268  blocknum)));
269 }
270 
271 /*
272  * Read as many blocks as we can into the per-tape buffer.
273  *
274  * Returns true if anything was read, 'false' on EOF.
275  */
276 static bool
278 {
279  lt->pos = 0;
280  lt->nbytes = 0;
281 
282  do
283  {
284  char *thisbuf = lt->buffer + lt->nbytes;
285  long datablocknum = lt->nextBlockNumber;
286 
287  /* Fetch next block number */
288  if (datablocknum == -1L)
289  break; /* EOF */
290  /* Apply worker offset, needed for leader tapesets */
291  datablocknum += lt->offsetBlockNumber;
292 
293  /* Read the block */
294  ltsReadBlock(lts, datablocknum, (void *) thisbuf);
295  if (!lt->frozen)
296  ltsReleaseBlock(lts, datablocknum);
298 
299  lt->nbytes += TapeBlockGetNBytes(thisbuf);
300  if (TapeBlockIsLast(thisbuf))
301  {
302  lt->nextBlockNumber = -1L;
303  /* EOF */
304  break;
305  }
306  else
307  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
308 
309  /* Advance to next block, if we have buffer space left */
310  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
311 
312  return (lt->nbytes > 0);
313 }
314 
315 static inline void
316 swap_nodes(long *heap, unsigned long a, unsigned long b)
317 {
318  unsigned long swap;
319 
320  swap = heap[a];
321  heap[a] = heap[b];
322  heap[b] = swap;
323 }
324 
325 static inline unsigned long
326 left_offset(unsigned long i)
327 {
328  return 2 * i + 1;
329 }
330 
331 static inline unsigned long
332 right_offset(unsigned i)
333 {
334  return 2 * i + 2;
335 }
336 
337 static inline unsigned long
338 parent_offset(unsigned long i)
339 {
340  return (i - 1) / 2;
341 }
342 
343 /*
344  * Select the lowest currently unused block by taking the first element from
345  * the freelist min heap.
346  */
347 static long
349 {
350  long *heap = lts->freeBlocks;
351  long blocknum;
352  int heapsize;
353  unsigned long pos;
354 
355  /* freelist empty; allocate a new block */
356  if (lts->nFreeBlocks == 0)
357  return lts->nBlocksAllocated++;
358 
359  if (lts->nFreeBlocks == 1)
360  {
361  lts->nFreeBlocks--;
362  return lts->freeBlocks[0];
363  }
364 
365  /* take top of minheap */
366  blocknum = heap[0];
367 
368  /* replace with end of minheap array */
369  heap[0] = heap[--lts->nFreeBlocks];
370 
371  /* sift down */
372  pos = 0;
373  heapsize = lts->nFreeBlocks;
374  while (true)
375  {
376  unsigned long left = left_offset(pos);
377  unsigned long right = right_offset(pos);
378  unsigned long min_child;
379 
380  if (left < heapsize && right < heapsize)
381  min_child = (heap[left] < heap[right]) ? left : right;
382  else if (left < heapsize)
383  min_child = left;
384  else if (right < heapsize)
385  min_child = right;
386  else
387  break;
388 
389  if (heap[min_child] >= heap[pos])
390  break;
391 
392  swap_nodes(heap, min_child, pos);
393  pos = min_child;
394  }
395 
396  return blocknum;
397 }
398 
399 /*
400  * Return a block# to the freelist.
401  */
402 static void
403 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
404 {
405  long *heap;
406  unsigned long pos;
407 
408  /*
409  * Do nothing if we're no longer interested in remembering free space.
410  */
411  if (lts->forgetFreeSpace)
412  return;
413 
414  /*
415  * Enlarge freeBlocks array if full.
416  */
417  if (lts->nFreeBlocks >= lts->freeBlocksLen)
418  {
419  /*
420  * If the freelist becomes very large, just return and leak this free
421  * block.
422  */
423  if (lts->freeBlocksLen * 2 > MaxAllocSize)
424  return;
425 
426  lts->freeBlocksLen *= 2;
427  lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
428  lts->freeBlocksLen * sizeof(long));
429  }
430 
431  heap = lts->freeBlocks;
432  pos = lts->nFreeBlocks;
433 
434  /* place entry at end of minheap array */
435  heap[pos] = blocknum;
436  lts->nFreeBlocks++;
437 
438  /* sift up */
439  while (pos != 0)
440  {
441  unsigned long parent = parent_offset(pos);
442  if (heap[parent] < heap[pos])
443  break;
444 
445  swap_nodes(heap, parent, pos);
446  pos = parent;
447  }
448 }
449 
450 /*
451  * Claim ownership of a set of logical tapes from existing shared BufFiles.
452  *
453  * Caller should be leader process. Though tapes are marked as frozen in
454  * workers, they are not frozen when opened within leader, since unfrozen tapes
455  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
456  * for random access.)
457  */
458 static void
460  SharedFileSet *fileset)
461 {
462  LogicalTape *lt = NULL;
463  long tapeblocks = 0L;
464  long nphysicalblocks = 0L;
465  int i;
466 
467  /* Should have at least one worker tape, plus leader's tape */
468  Assert(lts->nTapes >= 2);
469 
470  /*
471  * Build concatenated view of all BufFiles, remembering the block number
472  * where each source file begins. No changes are needed for leader/last
473  * tape.
474  */
475  for (i = 0; i < lts->nTapes - 1; i++)
476  {
477  char filename[MAXPGPATH];
478  BufFile *file;
479  int64 filesize;
480 
481  lt = &lts->tapes[i];
482 
483  pg_itoa(i, filename);
484  file = BufFileOpenShared(fileset, filename);
485  filesize = BufFileSize(file);
486 
487  /*
488  * Stash first BufFile, and concatenate subsequent BufFiles to that.
489  * Store block offset into each tape as we go.
490  */
491  lt->firstBlockNumber = shared[i].firstblocknumber;
492  if (i == 0)
493  {
494  lts->pfile = file;
495  lt->offsetBlockNumber = 0L;
496  }
497  else
498  {
499  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
500  }
501  /* Don't allocate more for read buffer than could possibly help */
502  lt->max_size = Min(MaxAllocSize, filesize);
503  tapeblocks = filesize / BLCKSZ;
504  nphysicalblocks += tapeblocks;
505  }
506 
507  /*
508  * Set # of allocated blocks, as well as # blocks written. Use extent of
509  * new BufFile space (from 0 to end of last worker's tape space) for this.
510  * Allocated/written blocks should include space used by holes left
511  * between concatenated BufFiles.
512  */
513  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
514  lts->nBlocksWritten = lts->nBlocksAllocated;
515 
516  /*
517  * Compute number of hole blocks so that we can later work backwards, and
518  * instrument number of physical blocks. We don't simply use physical
519  * blocks directly for instrumentation because this would break if we ever
520  * subsequently wrote to the leader tape.
521  *
522  * Working backwards like this keeps our options open. If shared BufFiles
523  * ever support being written to post-export, logtape.c can automatically
524  * take advantage of that. We'd then support writing to the leader tape
525  * while recycling space from worker tapes, because the leader tape has a
526  * zero offset (write routines won't need to have extra logic to apply an
527  * offset).
528  *
529  * The only thing that currently prevents writing to the leader tape from
530  * working is the fact that BufFiles opened using BufFileOpenShared() are
531  * read-only by definition, but that could be changed if it seemed
532  * worthwhile. For now, writing to the leader tape will raise a "Bad file
533  * descriptor" error, so tuplesort must avoid writing to the leader tape
534  * altogether.
535  */
536  lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
537 }
538 
539 /*
540  * Lazily allocate and initialize the read buffer. This avoids waste when many
541  * tapes are open at once, but not all are active between rewinding and
542  * reading.
543  */
544 static void
546 {
547  Assert(lt->buffer_size > 0);
548  lt->buffer = palloc(lt->buffer_size);
549 
550  /* Read the first block, or reset if tape is empty */
552  lt->pos = 0;
553  lt->nbytes = 0;
554  ltsReadFillBuffer(lts, lt);
555 }
556 
557 /*
558  * Create a set of logical tapes in a temporary underlying file.
559  *
560  * Each tape is initialized in write state. Serial callers pass ntapes,
561  * NULL argument for shared, and -1 for worker. Parallel worker callers
562  * pass ntapes, a shared file handle, NULL shared argument, and their own
563  * worker number. Leader callers, which claim shared worker tapes here,
564  * must supply non-sentinel values for all arguments except worker number,
565  * which should be -1.
566  *
567  * Leader caller is passing back an array of metadata each worker captured
568  * when LogicalTapeFreeze() was called for their final result tapes. Passed
569  * tapes array is actually sized ntapes - 1, because it includes only
570  * worker tapes, whereas leader requires its own leader tape. Note that we
571  * rely on the assumption that reclaimed worker tapes will only be read
572  * from once by leader, and never written to again (tapes are initialized
573  * for writing, but that's only to be consistent). Leader may not write to
574  * its own tape purely due to a restriction in the shared buffile
575  * infrastructure that may be lifted in the future.
576  */
578 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
579  int worker)
580 {
581  LogicalTapeSet *lts;
582  LogicalTape *lt;
583  int i;
584 
585  /*
586  * Create top-level struct including per-tape LogicalTape structs.
587  */
588  Assert(ntapes > 0);
589  lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
590  ntapes * sizeof(LogicalTape));
591  lts->nBlocksAllocated = 0L;
592  lts->nBlocksWritten = 0L;
593  lts->nHoleBlocks = 0L;
594  lts->forgetFreeSpace = false;
595  lts->freeBlocksLen = 32; /* reasonable initial guess */
596  lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
597  lts->nFreeBlocks = 0;
598  lts->nTapes = ntapes;
599 
600  /*
601  * Initialize per-tape structs. Note we allocate the I/O buffer and the
602  * first block for a tape only when it is first actually written to. This
603  * avoids wasting memory space when tuplesort.c overestimates the number
604  * of tapes needed.
605  */
606  for (i = 0; i < ntapes; i++)
607  {
608  lt = &lts->tapes[i];
609  lt->writing = true;
610  lt->frozen = false;
611  lt->dirty = false;
612  lt->firstBlockNumber = -1L;
613  lt->curBlockNumber = -1L;
614  lt->nextBlockNumber = -1L;
615  lt->offsetBlockNumber = 0L;
616  lt->buffer = NULL;
617  lt->buffer_size = 0;
618  /* palloc() larger than MaxAllocSize would fail */
619  lt->max_size = MaxAllocSize;
620  lt->pos = 0;
621  lt->nbytes = 0;
622  }
623 
624  /*
625  * Create temp BufFile storage as required.
626  *
627  * Leader concatenates worker tapes, which requires special adjustment to
628  * final tapeset data. Things are simpler for the worker case and the
629  * serial case, though. They are generally very similar -- workers use a
630  * shared fileset, whereas serial sorts use a conventional serial BufFile.
631  */
632  if (shared)
633  ltsConcatWorkerTapes(lts, shared, fileset);
634  else if (fileset)
635  {
636  char filename[MAXPGPATH];
637 
638  pg_itoa(worker, filename);
639  lts->pfile = BufFileCreateShared(fileset, filename);
640  }
641  else
642  lts->pfile = BufFileCreateTemp(false);
643 
644  return lts;
645 }
646 
647 /*
648  * Close a logical tape set and release all resources.
649  */
650 void
652 {
653  LogicalTape *lt;
654  int i;
655 
656  BufFileClose(lts->pfile);
657  for (i = 0; i < lts->nTapes; i++)
658  {
659  lt = &lts->tapes[i];
660  if (lt->buffer)
661  pfree(lt->buffer);
662  }
663  pfree(lts->freeBlocks);
664  pfree(lts);
665 }
666 
667 /*
668  * Mark a logical tape set as not needing management of free space anymore.
669  *
670  * This should be called if the caller does not intend to write any more data
671  * into the tape set, but is reading from un-frozen tapes. Since no more
672  * writes are planned, remembering free blocks is no longer useful. Setting
673  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
674  * is not designed to handle large numbers of free blocks.
675  */
676 void
678 {
679  lts->forgetFreeSpace = true;
680 }
681 
682 /*
683  * Write to a logical tape.
684  *
685  * There are no error returns; we ereport() on failure.
686  */
687 void
689  void *ptr, size_t size)
690 {
691  LogicalTape *lt;
692  size_t nthistime;
693 
694  Assert(tapenum >= 0 && tapenum < lts->nTapes);
695  lt = &lts->tapes[tapenum];
696  Assert(lt->writing);
697  Assert(lt->offsetBlockNumber == 0L);
698 
699  /* Allocate data buffer and first block on first write */
700  if (lt->buffer == NULL)
701  {
702  lt->buffer = (char *) palloc(BLCKSZ);
703  lt->buffer_size = BLCKSZ;
704  }
705  if (lt->curBlockNumber == -1)
706  {
707  Assert(lt->firstBlockNumber == -1);
708  Assert(lt->pos == 0);
709 
710  lt->curBlockNumber = ltsGetFreeBlock(lts);
712 
713  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
714  }
715 
716  Assert(lt->buffer_size == BLCKSZ);
717  while (size > 0)
718  {
719  if (lt->pos >= TapeBlockPayloadSize)
720  {
721  /* Buffer full, dump it out */
722  long nextBlockNumber;
723 
724  if (!lt->dirty)
725  {
726  /* Hmm, went directly from reading to writing? */
727  elog(ERROR, "invalid logtape state: should be dirty");
728  }
729 
730  /*
731  * First allocate the next block, so that we can store it in the
732  * 'next' pointer of this block.
733  */
734  nextBlockNumber = ltsGetFreeBlock(lts);
735 
736  /* set the next-pointer and dump the current block. */
737  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
738  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
739 
740  /* initialize the prev-pointer of the next block */
741  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
742  lt->curBlockNumber = nextBlockNumber;
743  lt->pos = 0;
744  lt->nbytes = 0;
745  }
746 
747  nthistime = TapeBlockPayloadSize - lt->pos;
748  if (nthistime > size)
749  nthistime = size;
750  Assert(nthistime > 0);
751 
752  memcpy(lt->buffer + lt->pos, ptr, nthistime);
753 
754  lt->dirty = true;
755  lt->pos += nthistime;
756  if (lt->nbytes < lt->pos)
757  lt->nbytes = lt->pos;
758  ptr = (void *) ((char *) ptr + nthistime);
759  size -= nthistime;
760  }
761 }
762 
763 /*
764  * Rewind logical tape and switch from writing to reading.
765  *
766  * The tape must currently be in writing state, or "frozen" in read state.
767  *
768  * 'buffer_size' specifies how much memory to use for the read buffer.
769  * Regardless of the argument, the actual amount of memory used is between
770  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
771  * rounded down and truncated to fit those constraints, if necessary. If the
772  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
773  * byte buffer is used.
774  */
775 void
776 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
777 {
778  LogicalTape *lt;
779 
780  Assert(tapenum >= 0 && tapenum < lts->nTapes);
781  lt = &lts->tapes[tapenum];
782 
783  /*
784  * Round and cap buffer_size if needed.
785  */
786  if (lt->frozen)
787  buffer_size = BLCKSZ;
788  else
789  {
790  /* need at least one block */
791  if (buffer_size < BLCKSZ)
792  buffer_size = BLCKSZ;
793 
794  /* palloc() larger than max_size is unlikely to be helpful */
795  if (buffer_size > lt->max_size)
796  buffer_size = lt->max_size;
797 
798  /* round down to BLCKSZ boundary */
799  buffer_size -= buffer_size % BLCKSZ;
800  }
801 
802  if (lt->writing)
803  {
804  /*
805  * Completion of a write phase. Flush last partial data block, and
806  * rewind for normal (destructive) read.
807  */
808  if (lt->dirty)
809  {
810  /*
811  * As long as we've filled the buffer at least once, its contents
812  * are entirely defined from valgrind's point of view, even though
813  * contents beyond the current end point may be stale. But it's
814  * possible - at least in the case of a parallel sort - to sort
815  * such small amount of data that we do not fill the buffer even
816  * once. Tell valgrind that its contents are defined, so it
817  * doesn't bleat.
818  */
820  lt->buffer_size - lt->nbytes);
821 
822  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
823  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
824  }
825  lt->writing = false;
826  }
827  else
828  {
829  /*
830  * This is only OK if tape is frozen; we rewind for (another) read
831  * pass.
832  */
833  Assert(lt->frozen);
834  }
835 
836  /* Allocate a read buffer (unless the tape is empty) */
837  if (lt->buffer)
838  pfree(lt->buffer);
839 
840  /* the buffer is lazily allocated, but set the size here */
841  lt->buffer = NULL;
842  lt->buffer_size = buffer_size;
843 }
844 
845 /*
846  * Rewind logical tape and switch from reading to writing.
847  *
848  * NOTE: we assume the caller has read the tape to the end; otherwise
849  * untouched data will not have been freed. We could add more code to free
850  * any unread blocks, but in current usage of this module it'd be useless
851  * code.
852  */
853 void
855 {
856  LogicalTape *lt;
857 
858  Assert(tapenum >= 0 && tapenum < lts->nTapes);
859  lt = &lts->tapes[tapenum];
860 
861  Assert(!lt->writing && !lt->frozen);
862  lt->writing = true;
863  lt->dirty = false;
864  lt->firstBlockNumber = -1L;
865  lt->curBlockNumber = -1L;
866  lt->pos = 0;
867  lt->nbytes = 0;
868  if (lt->buffer)
869  pfree(lt->buffer);
870  lt->buffer = NULL;
871  lt->buffer_size = 0;
872 }
873 
874 /*
875  * Read from a logical tape.
876  *
877  * Early EOF is indicated by return value less than #bytes requested.
878  */
879 size_t
880 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
881  void *ptr, size_t size)
882 {
883  LogicalTape *lt;
884  size_t nread = 0;
885  size_t nthistime;
886 
887  Assert(tapenum >= 0 && tapenum < lts->nTapes);
888  lt = &lts->tapes[tapenum];
889  Assert(!lt->writing);
890 
891  if (lt->buffer == NULL)
892  ltsInitReadBuffer(lts, lt);
893 
894  while (size > 0)
895  {
896  if (lt->pos >= lt->nbytes)
897  {
898  /* Try to load more data into buffer. */
899  if (!ltsReadFillBuffer(lts, lt))
900  break; /* EOF */
901  }
902 
903  nthistime = lt->nbytes - lt->pos;
904  if (nthistime > size)
905  nthistime = size;
906  Assert(nthistime > 0);
907 
908  memcpy(ptr, lt->buffer + lt->pos, nthistime);
909 
910  lt->pos += nthistime;
911  ptr = (void *) ((char *) ptr + nthistime);
912  size -= nthistime;
913  nread += nthistime;
914  }
915 
916  return nread;
917 }
918 
919 /*
920  * "Freeze" the contents of a tape so that it can be read multiple times
921  * and/or read backwards. Once a tape is frozen, its contents will not
922  * be released until the LogicalTapeSet is destroyed. This is expected
923  * to be used only for the final output pass of a merge.
924  *
925  * This *must* be called just at the end of a write pass, before the
926  * tape is rewound (after rewind is too late!). It performs a rewind
927  * and switch to read mode "for free". An immediately following rewind-
928  * for-read call is OK but not necessary.
929  *
930  * share output argument is set with details of storage used for tape after
931  * freezing, which may be passed to LogicalTapeSetCreate within leader
932  * process later. This metadata is only of interest to worker callers
933  * freezing their final output for leader (single materialized tape).
934  * Serial sorts should set share to NULL.
935  */
936 void
937 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
938 {
939  LogicalTape *lt;
940 
941  Assert(tapenum >= 0 && tapenum < lts->nTapes);
942  lt = &lts->tapes[tapenum];
943  Assert(lt->writing);
944  Assert(lt->offsetBlockNumber == 0L);
945 
946  /*
947  * Completion of a write phase. Flush last partial data block, and rewind
948  * for nondestructive read.
949  */
950  if (lt->dirty)
951  {
952  /*
953  * As long as we've filled the buffer at least once, its contents are
954  * entirely defined from valgrind's point of view, even though
955  * contents beyond the current end point may be stale. But it's
956  * possible - at least in the case of a parallel sort - to sort such
957  * small amount of data that we do not fill the buffer even once. Tell
958  * valgrind that its contents are defined, so it doesn't bleat.
959  */
961  lt->buffer_size - lt->nbytes);
962 
963  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
964  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
965  lt->writing = false;
966  }
967  lt->writing = false;
968  lt->frozen = true;
969 
970  /*
971  * The seek and backspace functions assume a single block read buffer.
972  * That's OK with current usage. A larger buffer is helpful to make the
973  * read pattern of the backing file look more sequential to the OS, when
974  * we're reading from multiple tapes. But at the end of a sort, when a
975  * tape is frozen, we only read from a single tape anyway.
976  */
977  if (!lt->buffer || lt->buffer_size != BLCKSZ)
978  {
979  if (lt->buffer)
980  pfree(lt->buffer);
981  lt->buffer = palloc(BLCKSZ);
982  lt->buffer_size = BLCKSZ;
983  }
984 
985  /* Read the first block, or reset if tape is empty */
987  lt->pos = 0;
988  lt->nbytes = 0;
989 
990  if (lt->firstBlockNumber == -1L)
991  lt->nextBlockNumber = -1L;
992  ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
993  if (TapeBlockIsLast(lt->buffer))
994  lt->nextBlockNumber = -1L;
995  else
996  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
997  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
998 
999  /* Handle extra steps when caller is to share its tapeset */
1000  if (share)
1001  {
1002  BufFileExportShared(lts->pfile);
1003  share->firstblocknumber = lt->firstBlockNumber;
1004  }
1005 }
1006 
1007 /*
1008  * Backspace the tape a given number of bytes. (We also support a more
1009  * general seek interface, see below.)
1010  *
1011  * *Only* a frozen-for-read tape can be backed up; we don't support
1012  * random access during write, and an unfrozen read tape may have
1013  * already discarded the desired data!
1014  *
1015  * Returns the number of bytes backed up. It can be less than the
1016  * requested amount, if there isn't that much data before the current
1017  * position. The tape is positioned to the beginning of the tape in
1018  * that case.
1019  */
1020 size_t
1021 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
1022 {
1023  LogicalTape *lt;
1024  size_t seekpos = 0;
1025 
1026  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1027  lt = &lts->tapes[tapenum];
1028  Assert(lt->frozen);
1029  Assert(lt->buffer_size == BLCKSZ);
1030 
1031  if (lt->buffer == NULL)
1032  ltsInitReadBuffer(lts, lt);
1033 
1034  /*
1035  * Easy case for seek within current block.
1036  */
1037  if (size <= (size_t) lt->pos)
1038  {
1039  lt->pos -= (int) size;
1040  return size;
1041  }
1042 
1043  /*
1044  * Not-so-easy case, have to walk back the chain of blocks. This
1045  * implementation would be pretty inefficient for long seeks, but we
1046  * really aren't doing that (a seek over one tuple is typical).
1047  */
1048  seekpos = (size_t) lt->pos; /* part within this block */
1049  while (size > seekpos)
1050  {
1051  long prev = TapeBlockGetTrailer(lt->buffer)->prev;
1052 
1053  if (prev == -1L)
1054  {
1055  /* Tried to back up beyond the beginning of tape. */
1056  if (lt->curBlockNumber != lt->firstBlockNumber)
1057  elog(ERROR, "unexpected end of tape");
1058  lt->pos = 0;
1059  return seekpos;
1060  }
1061 
1062  ltsReadBlock(lts, prev, (void *) lt->buffer);
1063 
1064  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1065  elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1066  prev,
1067  TapeBlockGetTrailer(lt->buffer)->next,
1068  lt->curBlockNumber);
1069 
1071  lt->curBlockNumber = prev;
1072  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1073 
1074  seekpos += TapeBlockPayloadSize;
1075  }
1076 
1077  /*
1078  * 'seekpos' can now be greater than 'size', because it points to the
1079  * beginning the target block. The difference is the position within the
1080  * page.
1081  */
1082  lt->pos = seekpos - size;
1083  return size;
1084 }
1085 
1086 /*
1087  * Seek to an arbitrary position in a logical tape.
1088  *
1089  * *Only* a frozen-for-read tape can be seeked.
1090  *
1091  * Must be called with a block/offset previously returned by
1092  * LogicalTapeTell().
1093  */
1094 void
1096  long blocknum, int offset)
1097 {
1098  LogicalTape *lt;
1099 
1100  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1101  lt = &lts->tapes[tapenum];
1102  Assert(lt->frozen);
1103  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1104  Assert(lt->buffer_size == BLCKSZ);
1105 
1106  if (lt->buffer == NULL)
1107  ltsInitReadBuffer(lts, lt);
1108 
1109  if (blocknum != lt->curBlockNumber)
1110  {
1111  ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1112  lt->curBlockNumber = blocknum;
1114  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1115  }
1116 
1117  if (offset > lt->nbytes)
1118  elog(ERROR, "invalid tape seek position");
1119  lt->pos = offset;
1120 }
1121 
1122 /*
1123  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1124  *
1125  * NOTE: it'd be OK to do this during write phase with intention of using
1126  * the position for a seek after freezing. Not clear if anyone needs that.
1127  */
1128 void
1130  long *blocknum, int *offset)
1131 {
1132  LogicalTape *lt;
1133 
1134  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1135  lt = &lts->tapes[tapenum];
1136 
1137  if (lt->buffer == NULL)
1138  ltsInitReadBuffer(lts, lt);
1139 
1140  Assert(lt->offsetBlockNumber == 0L);
1141 
1142  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1143  Assert(lt->buffer_size == BLCKSZ);
1144 
1145  *blocknum = lt->curBlockNumber;
1146  *offset = lt->pos;
1147 }
1148 
1149 /*
1150  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1151  */
1152 long
1154 {
1155  return lts->nBlocksAllocated - lts->nHoleBlocks;
1156 }
int max_size
Definition: logtape.c:151
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:880
LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: logtape.c:195
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define swap(a, b)
Definition: qsort.c:94
#define TapeBlockIsLast(buf)
Definition: logtape.c:106
#define TapeBlockPayloadSize
Definition: logtape.c:102
long offsetBlockNumber
Definition: logtape.c:144
long nBlocksWritten
Definition: logtape.c:177
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:280
int64 BufFileSize(BufFile *file)
Definition: buffile.c:785
static bool ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:277
#define Min(x, y)
Definition: c.h:920
static long ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:348
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:276
BufFile * pfile
Definition: logtape.c:164
#define MemSet(start, val, len)
Definition: c.h:971
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
Definition: logtape.c:403
long firstblocknumber
Definition: logtape.h:50
bool frozen
Definition: logtape.c:127
long nextBlockNumber
Definition: logtape.c:143
void BufFileClose(BufFile *file)
Definition: buffile.c:391
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:854
static void swap_nodes(long *heap, unsigned long a, unsigned long b)
Definition: logtape.c:316
bool writing
Definition: logtape.c:126
long * freeBlocks
Definition: logtape.c:189
bool dirty
Definition: logtape.c:128
char data[BLCKSZ]
Definition: c.h:1100
void pfree(void *pointer)
Definition: mcxt.c:1056
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:688
#define ERROR
Definition: elog.h:43
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:184
#define MAXPGPATH
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:578
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:258
size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Definition: logtape.c:1021
int errcode_for_file_access(void)
Definition: elog.c:631
long nHoleBlocks
Definition: logtape.c:178
int nbytes
Definition: logtape.c:153
static unsigned long right_offset(unsigned i)
Definition: logtape.c:332
void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset)
Definition: logtape.c:1129
#define ereport(elevel, rest)
Definition: elog.h:141
void pg_itoa(int16 i, char *a)
Definition: numutils.c:335
static unsigned long left_offset(unsigned long i)
Definition: logtape.c:326
#define MaxAllocSize
Definition: memutils.h:40
long firstBlockNumber
Definition: logtape.c:141
long nFreeBlocks
Definition: logtape.c:190
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:752
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:110
static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:545
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset)
Definition: logtape.c:459
long curBlockNumber
Definition: logtape.c:142
#define Assert(condition)
Definition: c.h:738
struct TapeBlockTrailer TapeBlockTrailer
size_t Size
Definition: c.h:466
void BufFileExportShared(BufFile *file)
Definition: buffile.c:373
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:776
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:107
void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
Definition: logtape.c:937
char * buffer
Definition: logtape.c:149
static char * filename
Definition: pg_dumpall.c:90
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
bool forgetFreeSpace
Definition: logtape.c:188
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:528
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:575
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:651
void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset)
Definition: logtape.c:1095
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:677
Size freeBlocksLen
Definition: logtape.c:191
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1153
struct LogicalTape LogicalTape
static unsigned long parent_offset(unsigned long i)
Definition: logtape.c:338
long BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:824
long nBlocksAllocated
Definition: logtape.c:176
int buffer_size
Definition: logtape.c:150
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:213
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:103
#define offsetof(type, field)
Definition: c.h:661
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:261