PostgreSQL Source Code  git master
logtape.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  * Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted. (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass. For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner. Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated. In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file. logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c. Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape form a chain, with a prev- and next-
35  * pointer in each block.
36  *
37  * The initial write pass is guaranteed to fill the underlying file
38  * perfectly sequentially, no matter how data is divided into logical tapes.
39  * Once we begin merge passes, the access pattern becomes considerably
40  * less predictable --- but the seeking involved should be comparable to
41  * what would happen if we kept each logical tape in a separate file,
42  * so there's no serious performance penalty paid to obtain the space
43  * savings of recycling. We try to localize the write accesses by always
44  * writing to the lowest-numbered free block when we have a choice; it's
45  * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46  * policy for free blocks would be better?)
47  *
48  * To further make the I/Os more sequential, we can use a larger buffer
49  * when reading, and read multiple blocks from the same tape in one go,
50  * whenever the buffer becomes empty.
51  *
52  * To support the above policy of writing to the lowest free block, the
53  * freelist is a min heap.
54  *
55  * Since all the bookkeeping and buffer memory is allocated with palloc(),
56  * and the underlying file(s) are made with OpenTemporaryFile, all resources
57  * for a logical tape set are certain to be cleaned up even if processing
58  * is aborted by ereport(ERROR). To avoid confusion, the caller should take
59  * care that all calls for a single LogicalTapeSet are made in the same
60  * palloc context.
61  *
62  * To support parallel sort operations involving coordinated callers to
63  * tuplesort.c routines across multiple workers, it is necessary to
64  * concatenate each worker BufFile/tapeset into one single logical tapeset
65  * managed by the leader. Workers should have produced one final
66  * materialized tape (their entire output) when this happens in leader.
67  * There will always be the same number of runs as input tapes, and the same
68  * number of input tapes as participants (worker Tuplesortstates).
69  *
70  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
71  * Portions Copyright (c) 1994, Regents of the University of California
72  *
73  * IDENTIFICATION
74  * src/backend/utils/sort/logtape.c
75  *
76  *-------------------------------------------------------------------------
77  */
78 
79 #include "postgres.h"
80 
81 #include "storage/buffile.h"
82 #include "utils/builtins.h"
83 #include "utils/logtape.h"
84 #include "utils/memdebug.h"
85 #include "utils/memutils.h"
86 
87 /*
88  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
89  *
90  * The first block of a tape has prev == -1. The last block of a tape
91  * stores the number of valid bytes on the block, inverted, in 'next'
92  * Therefore next < 0 indicates the last block.
93  */
94 typedef struct TapeBlockTrailer
95 {
96  long prev; /* previous block on this tape, or -1 on first
97  * block */
98  long next; /* next block on this tape, or # of valid
99  * bytes on last block (if < 0) */
101 
102 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
103 #define TapeBlockGetTrailer(buf) \
104  ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
105 
106 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
107 #define TapeBlockGetNBytes(buf) \
108  (TapeBlockIsLast(buf) ? \
109  (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
110 #define TapeBlockSetNBytes(buf, nbytes) \
111  (TapeBlockGetTrailer(buf)->next = -(nbytes))
112 
113 
114 /*
115  * This data structure represents a single "logical tape" within the set
116  * of logical tapes stored in the same file.
117  *
118  * While writing, we hold the current partially-written data block in the
119  * buffer. While reading, we can hold multiple blocks in the buffer. Note
120  * that we don't retain the trailers of a block when it's read into the
121  * buffer. The buffer therefore contains one large contiguous chunk of data
122  * from the tape.
123  */
124 typedef struct LogicalTape
125 {
126  bool writing; /* T while in write phase */
127  bool frozen; /* T if blocks should not be freed when read */
128  bool dirty; /* does buffer need to be written? */
129 
130  /*
131  * Block numbers of the first, current, and next block of the tape.
132  *
133  * The "current" block number is only valid when writing, or reading from
134  * a frozen tape. (When reading from an unfrozen tape, we use a larger
135  * read buffer that holds multiple blocks, so the "current" block is
136  * ambiguous.)
137  *
138  * When concatenation of worker tape BufFiles is performed, an offset to
139  * the first block in the unified BufFile space is applied during reads.
140  */
145 
146  /*
147  * Buffer for current data block(s).
148  */
149  char *buffer; /* physical buffer (separately palloc'd) */
150  int buffer_size; /* allocated size of the buffer */
151  int max_size; /* highest useful, safe buffer_size */
152  int pos; /* next read/write position in buffer */
153  int nbytes; /* total # of valid bytes in buffer */
154 } LogicalTape;
155 
156 /*
157  * This data structure represents a set of related "logical tapes" sharing
158  * space in a single underlying file. (But that "file" may be multiple files
159  * if needed to escape OS limits on file size; buffile.c handles that for us.)
160  * The number of tapes is fixed at creation.
161  */
163 {
164  BufFile *pfile; /* underlying file for whole tape set */
165 
166  /*
167  * File size tracking. nBlocksWritten is the size of the underlying file,
168  * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
169  * by ltsReleaseBlock(), and it is always greater than or equal to
170  * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
171  * blocks that have been allocated for a tape, but have not been written
172  * to the underlying file yet. nHoleBlocks tracks the total number of
173  * blocks that are in unused holes between worker spaces following BufFile
174  * concatenation.
175  */
176  long nBlocksAllocated; /* # of blocks allocated */
177  long nBlocksWritten; /* # of blocks used in underlying file */
178  long nHoleBlocks; /* # of "hole" blocks left */
179 
180  /*
181  * We store the numbers of recycled-and-available blocks in freeBlocks[].
182  * When there are no such blocks, we extend the underlying file.
183  *
184  * If forgetFreeSpace is true then any freed blocks are simply forgotten
185  * rather than being remembered in freeBlocks[]. See notes for
186  * LogicalTapeSetForgetFreeSpace().
187  */
188  bool forgetFreeSpace; /* are we remembering free blocks? */
189  long *freeBlocks; /* resizable array holding minheap */
190  long nFreeBlocks; /* # of currently free blocks */
191  Size freeBlocksLen; /* current allocated length of freeBlocks[] */
192 
193  /* The array of logical tapes. */
194  int nTapes; /* # of logical tapes in set */
195  LogicalTape *tapes; /* has nTapes nentries */
196 };
197 
198 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
199 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
200 static long ltsGetFreeBlock(LogicalTapeSet *lts);
201 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
202 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
203  SharedFileSet *fileset);
204 static void ltsInitTape(LogicalTape *lt);
205 static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
206 
207 
208 /*
209  * Write a block-sized buffer to the specified block of the underlying file.
210  *
211  * No need for an error return convention; we ereport() on any error.
212  */
213 static void
214 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
215 {
216  /*
217  * BufFile does not support "holes", so if we're about to write a block
218  * that's past the current end of file, fill the space between the current
219  * end of file and the target block with zeros.
220  *
221  * This should happen rarely, otherwise you are not writing very
222  * sequentially. In current use, this only happens when the sort ends
223  * writing a run, and switches to another tape. The last block of the
224  * previous tape isn't flushed to disk until the end of the sort, so you
225  * get one-block hole, where the last block of the previous tape will
226  * later go.
227  *
228  * Note that BufFile concatenation can leave "holes" in BufFile between
229  * worker-owned block ranges. These are tracked for reporting purposes
230  * only. We never read from nor write to these hole blocks, and so they
231  * are not considered here.
232  */
233  while (blocknum > lts->nBlocksWritten)
234  {
235  PGAlignedBlock zerobuf;
236 
237  MemSet(zerobuf.data, 0, sizeof(zerobuf));
238 
239  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
240  }
241 
242  /* Write the requested block */
243  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
244  BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
245  ereport(ERROR,
247  errmsg("could not write block %ld of temporary file: %m",
248  blocknum)));
249 
250  /* Update nBlocksWritten, if we extended the file */
251  if (blocknum == lts->nBlocksWritten)
252  lts->nBlocksWritten++;
253 }
254 
255 /*
256  * Read a block-sized buffer from the specified block of the underlying file.
257  *
258  * No need for an error return convention; we ereport() on any error. This
259  * module should never attempt to read a block it doesn't know is there.
260  */
261 static void
262 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
263 {
264  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
265  BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
266  ereport(ERROR,
268  errmsg("could not read block %ld of temporary file: %m",
269  blocknum)));
270 }
271 
272 /*
273  * Read as many blocks as we can into the per-tape buffer.
274  *
275  * Returns true if anything was read, 'false' on EOF.
276  */
277 static bool
279 {
280  lt->pos = 0;
281  lt->nbytes = 0;
282 
283  do
284  {
285  char *thisbuf = lt->buffer + lt->nbytes;
286  long datablocknum = lt->nextBlockNumber;
287 
288  /* Fetch next block number */
289  if (datablocknum == -1L)
290  break; /* EOF */
291  /* Apply worker offset, needed for leader tapesets */
292  datablocknum += lt->offsetBlockNumber;
293 
294  /* Read the block */
295  ltsReadBlock(lts, datablocknum, (void *) thisbuf);
296  if (!lt->frozen)
297  ltsReleaseBlock(lts, datablocknum);
299 
300  lt->nbytes += TapeBlockGetNBytes(thisbuf);
301  if (TapeBlockIsLast(thisbuf))
302  {
303  lt->nextBlockNumber = -1L;
304  /* EOF */
305  break;
306  }
307  else
308  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
309 
310  /* Advance to next block, if we have buffer space left */
311  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
312 
313  return (lt->nbytes > 0);
314 }
315 
316 static inline void
317 swap_nodes(long *heap, unsigned long a, unsigned long b)
318 {
319  unsigned long swap;
320 
321  swap = heap[a];
322  heap[a] = heap[b];
323  heap[b] = swap;
324 }
325 
326 static inline unsigned long
327 left_offset(unsigned long i)
328 {
329  return 2 * i + 1;
330 }
331 
332 static inline unsigned long
333 right_offset(unsigned i)
334 {
335  return 2 * i + 2;
336 }
337 
338 static inline unsigned long
339 parent_offset(unsigned long i)
340 {
341  return (i - 1) / 2;
342 }
343 
344 /*
345  * Select the lowest currently unused block by taking the first element from
346  * the freelist min heap.
347  */
348 static long
350 {
351  long *heap = lts->freeBlocks;
352  long blocknum;
353  int heapsize;
354  unsigned long pos;
355 
356  /* freelist empty; allocate a new block */
357  if (lts->nFreeBlocks == 0)
358  return lts->nBlocksAllocated++;
359 
360  if (lts->nFreeBlocks == 1)
361  {
362  lts->nFreeBlocks--;
363  return lts->freeBlocks[0];
364  }
365 
366  /* take top of minheap */
367  blocknum = heap[0];
368 
369  /* replace with end of minheap array */
370  heap[0] = heap[--lts->nFreeBlocks];
371 
372  /* sift down */
373  pos = 0;
374  heapsize = lts->nFreeBlocks;
375  while (true)
376  {
377  unsigned long left = left_offset(pos);
378  unsigned long right = right_offset(pos);
379  unsigned long min_child;
380 
381  if (left < heapsize && right < heapsize)
382  min_child = (heap[left] < heap[right]) ? left : right;
383  else if (left < heapsize)
384  min_child = left;
385  else if (right < heapsize)
386  min_child = right;
387  else
388  break;
389 
390  if (heap[min_child] >= heap[pos])
391  break;
392 
393  swap_nodes(heap, min_child, pos);
394  pos = min_child;
395  }
396 
397  return blocknum;
398 }
399 
400 /*
401  * Return a block# to the freelist.
402  */
403 static void
404 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
405 {
406  long *heap;
407  unsigned long pos;
408 
409  /*
410  * Do nothing if we're no longer interested in remembering free space.
411  */
412  if (lts->forgetFreeSpace)
413  return;
414 
415  /*
416  * Enlarge freeBlocks array if full.
417  */
418  if (lts->nFreeBlocks >= lts->freeBlocksLen)
419  {
420  /*
421  * If the freelist becomes very large, just return and leak this free
422  * block.
423  */
424  if (lts->freeBlocksLen * 2 > MaxAllocSize)
425  return;
426 
427  lts->freeBlocksLen *= 2;
428  lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
429  lts->freeBlocksLen * sizeof(long));
430  }
431 
432  heap = lts->freeBlocks;
433  pos = lts->nFreeBlocks;
434 
435  /* place entry at end of minheap array */
436  heap[pos] = blocknum;
437  lts->nFreeBlocks++;
438 
439  /* sift up */
440  while (pos != 0)
441  {
442  unsigned long parent = parent_offset(pos);
443 
444  if (heap[parent] < heap[pos])
445  break;
446 
447  swap_nodes(heap, parent, pos);
448  pos = parent;
449  }
450 }
451 
452 /*
453  * Claim ownership of a set of logical tapes from existing shared BufFiles.
454  *
455  * Caller should be leader process. Though tapes are marked as frozen in
456  * workers, they are not frozen when opened within leader, since unfrozen tapes
457  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
458  * for random access.)
459  */
460 static void
462  SharedFileSet *fileset)
463 {
464  LogicalTape *lt = NULL;
465  long tapeblocks = 0L;
466  long nphysicalblocks = 0L;
467  int i;
468 
469  /* Should have at least one worker tape, plus leader's tape */
470  Assert(lts->nTapes >= 2);
471 
472  /*
473  * Build concatenated view of all BufFiles, remembering the block number
474  * where each source file begins. No changes are needed for leader/last
475  * tape.
476  */
477  for (i = 0; i < lts->nTapes - 1; i++)
478  {
479  char filename[MAXPGPATH];
480  BufFile *file;
481  int64 filesize;
482 
483  lt = &lts->tapes[i];
484 
485  pg_itoa(i, filename);
486  file = BufFileOpenShared(fileset, filename);
487  filesize = BufFileSize(file);
488 
489  /*
490  * Stash first BufFile, and concatenate subsequent BufFiles to that.
491  * Store block offset into each tape as we go.
492  */
493  lt->firstBlockNumber = shared[i].firstblocknumber;
494  if (i == 0)
495  {
496  lts->pfile = file;
497  lt->offsetBlockNumber = 0L;
498  }
499  else
500  {
501  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
502  }
503  /* Don't allocate more for read buffer than could possibly help */
504  lt->max_size = Min(MaxAllocSize, filesize);
505  tapeblocks = filesize / BLCKSZ;
506  nphysicalblocks += tapeblocks;
507  }
508 
509  /*
510  * Set # of allocated blocks, as well as # blocks written. Use extent of
511  * new BufFile space (from 0 to end of last worker's tape space) for this.
512  * Allocated/written blocks should include space used by holes left
513  * between concatenated BufFiles.
514  */
515  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
516  lts->nBlocksWritten = lts->nBlocksAllocated;
517 
518  /*
519  * Compute number of hole blocks so that we can later work backwards, and
520  * instrument number of physical blocks. We don't simply use physical
521  * blocks directly for instrumentation because this would break if we ever
522  * subsequently wrote to the leader tape.
523  *
524  * Working backwards like this keeps our options open. If shared BufFiles
525  * ever support being written to post-export, logtape.c can automatically
526  * take advantage of that. We'd then support writing to the leader tape
527  * while recycling space from worker tapes, because the leader tape has a
528  * zero offset (write routines won't need to have extra logic to apply an
529  * offset).
530  *
531  * The only thing that currently prevents writing to the leader tape from
532  * working is the fact that BufFiles opened using BufFileOpenShared() are
533  * read-only by definition, but that could be changed if it seemed
534  * worthwhile. For now, writing to the leader tape will raise a "Bad file
535  * descriptor" error, so tuplesort must avoid writing to the leader tape
536  * altogether.
537  */
538  lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
539 }
540 
541 /*
542  * Initialize per-tape struct. Note we allocate the I/O buffer lazily.
543  */
544 static void
546 {
547  lt->writing = true;
548  lt->frozen = false;
549  lt->dirty = false;
550  lt->firstBlockNumber = -1L;
551  lt->curBlockNumber = -1L;
552  lt->nextBlockNumber = -1L;
553  lt->offsetBlockNumber = 0L;
554  lt->buffer = NULL;
555  lt->buffer_size = 0;
556  /* palloc() larger than MaxAllocSize would fail */
557  lt->max_size = MaxAllocSize;
558  lt->pos = 0;
559  lt->nbytes = 0;
560 }
561 
562 /*
563  * Lazily allocate and initialize the read buffer. This avoids waste when many
564  * tapes are open at once, but not all are active between rewinding and
565  * reading.
566  */
567 static void
569 {
570  Assert(lt->buffer_size > 0);
571  lt->buffer = palloc(lt->buffer_size);
572 
573  /* Read the first block, or reset if tape is empty */
575  lt->pos = 0;
576  lt->nbytes = 0;
577  ltsReadFillBuffer(lts, lt);
578 }
579 
580 /*
581  * Create a set of logical tapes in a temporary underlying file.
582  *
583  * Each tape is initialized in write state. Serial callers pass ntapes,
584  * NULL argument for shared, and -1 for worker. Parallel worker callers
585  * pass ntapes, a shared file handle, NULL shared argument, and their own
586  * worker number. Leader callers, which claim shared worker tapes here,
587  * must supply non-sentinel values for all arguments except worker number,
588  * which should be -1.
589  *
590  * Leader caller is passing back an array of metadata each worker captured
591  * when LogicalTapeFreeze() was called for their final result tapes. Passed
592  * tapes array is actually sized ntapes - 1, because it includes only
593  * worker tapes, whereas leader requires its own leader tape. Note that we
594  * rely on the assumption that reclaimed worker tapes will only be read
595  * from once by leader, and never written to again (tapes are initialized
596  * for writing, but that's only to be consistent). Leader may not write to
597  * its own tape purely due to a restriction in the shared buffile
598  * infrastructure that may be lifted in the future.
599  */
601 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
602  int worker)
603 {
604  LogicalTapeSet *lts;
605  int i;
606 
607  /*
608  * Create top-level struct including per-tape LogicalTape structs.
609  */
610  Assert(ntapes > 0);
611  lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
612  lts->nBlocksAllocated = 0L;
613  lts->nBlocksWritten = 0L;
614  lts->nHoleBlocks = 0L;
615  lts->forgetFreeSpace = false;
616  lts->freeBlocksLen = 32; /* reasonable initial guess */
617  lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
618  lts->nFreeBlocks = 0;
619  lts->nTapes = ntapes;
620  lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
621 
622  for (i = 0; i < ntapes; i++)
623  ltsInitTape(&lts->tapes[i]);
624 
625  /*
626  * Create temp BufFile storage as required.
627  *
628  * Leader concatenates worker tapes, which requires special adjustment to
629  * final tapeset data. Things are simpler for the worker case and the
630  * serial case, though. They are generally very similar -- workers use a
631  * shared fileset, whereas serial sorts use a conventional serial BufFile.
632  */
633  if (shared)
634  ltsConcatWorkerTapes(lts, shared, fileset);
635  else if (fileset)
636  {
637  char filename[MAXPGPATH];
638 
639  pg_itoa(worker, filename);
640  lts->pfile = BufFileCreateShared(fileset, filename);
641  }
642  else
643  lts->pfile = BufFileCreateTemp(false);
644 
645  return lts;
646 }
647 
648 /*
649  * Close a logical tape set and release all resources.
650  */
651 void
653 {
654  LogicalTape *lt;
655  int i;
656 
657  BufFileClose(lts->pfile);
658  for (i = 0; i < lts->nTapes; i++)
659  {
660  lt = &lts->tapes[i];
661  if (lt->buffer)
662  pfree(lt->buffer);
663  }
664  pfree(lts->tapes);
665  pfree(lts->freeBlocks);
666  pfree(lts);
667 }
668 
669 /*
670  * Mark a logical tape set as not needing management of free space anymore.
671  *
672  * This should be called if the caller does not intend to write any more data
673  * into the tape set, but is reading from un-frozen tapes. Since no more
674  * writes are planned, remembering free blocks is no longer useful. Setting
675  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
676  * is not designed to handle large numbers of free blocks.
677  */
678 void
680 {
681  lts->forgetFreeSpace = true;
682 }
683 
684 /*
685  * Write to a logical tape.
686  *
687  * There are no error returns; we ereport() on failure.
688  */
689 void
691  void *ptr, size_t size)
692 {
693  LogicalTape *lt;
694  size_t nthistime;
695 
696  Assert(tapenum >= 0 && tapenum < lts->nTapes);
697  lt = &lts->tapes[tapenum];
698  Assert(lt->writing);
699  Assert(lt->offsetBlockNumber == 0L);
700 
701  /* Allocate data buffer and first block on first write */
702  if (lt->buffer == NULL)
703  {
704  lt->buffer = (char *) palloc(BLCKSZ);
705  lt->buffer_size = BLCKSZ;
706  }
707  if (lt->curBlockNumber == -1)
708  {
709  Assert(lt->firstBlockNumber == -1);
710  Assert(lt->pos == 0);
711 
712  lt->curBlockNumber = ltsGetFreeBlock(lts);
714 
715  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
716  }
717 
718  Assert(lt->buffer_size == BLCKSZ);
719  while (size > 0)
720  {
721  if (lt->pos >= TapeBlockPayloadSize)
722  {
723  /* Buffer full, dump it out */
724  long nextBlockNumber;
725 
726  if (!lt->dirty)
727  {
728  /* Hmm, went directly from reading to writing? */
729  elog(ERROR, "invalid logtape state: should be dirty");
730  }
731 
732  /*
733  * First allocate the next block, so that we can store it in the
734  * 'next' pointer of this block.
735  */
736  nextBlockNumber = ltsGetFreeBlock(lts);
737 
738  /* set the next-pointer and dump the current block. */
739  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
740  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
741 
742  /* initialize the prev-pointer of the next block */
743  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
744  lt->curBlockNumber = nextBlockNumber;
745  lt->pos = 0;
746  lt->nbytes = 0;
747  }
748 
749  nthistime = TapeBlockPayloadSize - lt->pos;
750  if (nthistime > size)
751  nthistime = size;
752  Assert(nthistime > 0);
753 
754  memcpy(lt->buffer + lt->pos, ptr, nthistime);
755 
756  lt->dirty = true;
757  lt->pos += nthistime;
758  if (lt->nbytes < lt->pos)
759  lt->nbytes = lt->pos;
760  ptr = (void *) ((char *) ptr + nthistime);
761  size -= nthistime;
762  }
763 }
764 
765 /*
766  * Rewind logical tape and switch from writing to reading.
767  *
768  * The tape must currently be in writing state, or "frozen" in read state.
769  *
770  * 'buffer_size' specifies how much memory to use for the read buffer.
771  * Regardless of the argument, the actual amount of memory used is between
772  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
773  * rounded down and truncated to fit those constraints, if necessary. If the
774  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
775  * byte buffer is used.
776  */
777 void
778 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
779 {
780  LogicalTape *lt;
781 
782  Assert(tapenum >= 0 && tapenum < lts->nTapes);
783  lt = &lts->tapes[tapenum];
784 
785  /*
786  * Round and cap buffer_size if needed.
787  */
788  if (lt->frozen)
789  buffer_size = BLCKSZ;
790  else
791  {
792  /* need at least one block */
793  if (buffer_size < BLCKSZ)
794  buffer_size = BLCKSZ;
795 
796  /* palloc() larger than max_size is unlikely to be helpful */
797  if (buffer_size > lt->max_size)
798  buffer_size = lt->max_size;
799 
800  /* round down to BLCKSZ boundary */
801  buffer_size -= buffer_size % BLCKSZ;
802  }
803 
804  if (lt->writing)
805  {
806  /*
807  * Completion of a write phase. Flush last partial data block, and
808  * rewind for normal (destructive) read.
809  */
810  if (lt->dirty)
811  {
812  /*
813  * As long as we've filled the buffer at least once, its contents
814  * are entirely defined from valgrind's point of view, even though
815  * contents beyond the current end point may be stale. But it's
816  * possible - at least in the case of a parallel sort - to sort
817  * such small amount of data that we do not fill the buffer even
818  * once. Tell valgrind that its contents are defined, so it
819  * doesn't bleat.
820  */
822  lt->buffer_size - lt->nbytes);
823 
824  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
825  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
826  }
827  lt->writing = false;
828  }
829  else
830  {
831  /*
832  * This is only OK if tape is frozen; we rewind for (another) read
833  * pass.
834  */
835  Assert(lt->frozen);
836  }
837 
838  /* Allocate a read buffer (unless the tape is empty) */
839  if (lt->buffer)
840  pfree(lt->buffer);
841 
842  /* the buffer is lazily allocated, but set the size here */
843  lt->buffer = NULL;
844  lt->buffer_size = buffer_size;
845 }
846 
847 /*
848  * Rewind logical tape and switch from reading to writing.
849  *
850  * NOTE: we assume the caller has read the tape to the end; otherwise
851  * untouched data will not have been freed. We could add more code to free
852  * any unread blocks, but in current usage of this module it'd be useless
853  * code.
854  */
855 void
857 {
858  LogicalTape *lt;
859 
860  Assert(tapenum >= 0 && tapenum < lts->nTapes);
861  lt = &lts->tapes[tapenum];
862 
863  Assert(!lt->writing && !lt->frozen);
864  lt->writing = true;
865  lt->dirty = false;
866  lt->firstBlockNumber = -1L;
867  lt->curBlockNumber = -1L;
868  lt->pos = 0;
869  lt->nbytes = 0;
870  if (lt->buffer)
871  pfree(lt->buffer);
872  lt->buffer = NULL;
873  lt->buffer_size = 0;
874 }
875 
876 /*
877  * Read from a logical tape.
878  *
879  * Early EOF is indicated by return value less than #bytes requested.
880  */
881 size_t
882 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
883  void *ptr, size_t size)
884 {
885  LogicalTape *lt;
886  size_t nread = 0;
887  size_t nthistime;
888 
889  Assert(tapenum >= 0 && tapenum < lts->nTapes);
890  lt = &lts->tapes[tapenum];
891  Assert(!lt->writing);
892 
893  if (lt->buffer == NULL)
894  ltsInitReadBuffer(lts, lt);
895 
896  while (size > 0)
897  {
898  if (lt->pos >= lt->nbytes)
899  {
900  /* Try to load more data into buffer. */
901  if (!ltsReadFillBuffer(lts, lt))
902  break; /* EOF */
903  }
904 
905  nthistime = lt->nbytes - lt->pos;
906  if (nthistime > size)
907  nthistime = size;
908  Assert(nthistime > 0);
909 
910  memcpy(ptr, lt->buffer + lt->pos, nthistime);
911 
912  lt->pos += nthistime;
913  ptr = (void *) ((char *) ptr + nthistime);
914  size -= nthistime;
915  nread += nthistime;
916  }
917 
918  return nread;
919 }
920 
921 /*
922  * "Freeze" the contents of a tape so that it can be read multiple times
923  * and/or read backwards. Once a tape is frozen, its contents will not
924  * be released until the LogicalTapeSet is destroyed. This is expected
925  * to be used only for the final output pass of a merge.
926  *
927  * This *must* be called just at the end of a write pass, before the
928  * tape is rewound (after rewind is too late!). It performs a rewind
929  * and switch to read mode "for free". An immediately following rewind-
930  * for-read call is OK but not necessary.
931  *
932  * share output argument is set with details of storage used for tape after
933  * freezing, which may be passed to LogicalTapeSetCreate within leader
934  * process later. This metadata is only of interest to worker callers
935  * freezing their final output for leader (single materialized tape).
936  * Serial sorts should set share to NULL.
937  */
938 void
939 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
940 {
941  LogicalTape *lt;
942 
943  Assert(tapenum >= 0 && tapenum < lts->nTapes);
944  lt = &lts->tapes[tapenum];
945  Assert(lt->writing);
946  Assert(lt->offsetBlockNumber == 0L);
947 
948  /*
949  * Completion of a write phase. Flush last partial data block, and rewind
950  * for nondestructive read.
951  */
952  if (lt->dirty)
953  {
954  /*
955  * As long as we've filled the buffer at least once, its contents are
956  * entirely defined from valgrind's point of view, even though
957  * contents beyond the current end point may be stale. But it's
958  * possible - at least in the case of a parallel sort - to sort such
959  * small amount of data that we do not fill the buffer even once. Tell
960  * valgrind that its contents are defined, so it doesn't bleat.
961  */
963  lt->buffer_size - lt->nbytes);
964 
965  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
966  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
967  lt->writing = false;
968  }
969  lt->writing = false;
970  lt->frozen = true;
971 
972  /*
973  * The seek and backspace functions assume a single block read buffer.
974  * That's OK with current usage. A larger buffer is helpful to make the
975  * read pattern of the backing file look more sequential to the OS, when
976  * we're reading from multiple tapes. But at the end of a sort, when a
977  * tape is frozen, we only read from a single tape anyway.
978  */
979  if (!lt->buffer || lt->buffer_size != BLCKSZ)
980  {
981  if (lt->buffer)
982  pfree(lt->buffer);
983  lt->buffer = palloc(BLCKSZ);
984  lt->buffer_size = BLCKSZ;
985  }
986 
987  /* Read the first block, or reset if tape is empty */
989  lt->pos = 0;
990  lt->nbytes = 0;
991 
992  if (lt->firstBlockNumber == -1L)
993  lt->nextBlockNumber = -1L;
994  ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
995  if (TapeBlockIsLast(lt->buffer))
996  lt->nextBlockNumber = -1L;
997  else
998  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
999  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
1000 
1001  /* Handle extra steps when caller is to share its tapeset */
1002  if (share)
1003  {
1004  BufFileExportShared(lts->pfile);
1005  share->firstblocknumber = lt->firstBlockNumber;
1006  }
1007 }
1008 
1009 /*
1010  * Add additional tapes to this tape set. Not intended to be used when any
1011  * tapes are frozen.
1012  */
1013 void
1014 LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
1015 {
1016  int i;
1017  int nTapesOrig = lts->nTapes;
1018 
1019  lts->nTapes += nAdditional;
1020 
1021  lts->tapes = (LogicalTape *) repalloc(lts->tapes,
1022  lts->nTapes * sizeof(LogicalTape));
1023 
1024  for (i = nTapesOrig; i < lts->nTapes; i++)
1025  ltsInitTape(&lts->tapes[i]);
1026 }
1027 
1028 /*
1029  * Backspace the tape a given number of bytes. (We also support a more
1030  * general seek interface, see below.)
1031  *
1032  * *Only* a frozen-for-read tape can be backed up; we don't support
1033  * random access during write, and an unfrozen read tape may have
1034  * already discarded the desired data!
1035  *
1036  * Returns the number of bytes backed up. It can be less than the
1037  * requested amount, if there isn't that much data before the current
1038  * position. The tape is positioned to the beginning of the tape in
1039  * that case.
1040  */
1041 size_t
1042 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
1043 {
1044  LogicalTape *lt;
1045  size_t seekpos = 0;
1046 
1047  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1048  lt = &lts->tapes[tapenum];
1049  Assert(lt->frozen);
1050  Assert(lt->buffer_size == BLCKSZ);
1051 
1052  if (lt->buffer == NULL)
1053  ltsInitReadBuffer(lts, lt);
1054 
1055  /*
1056  * Easy case for seek within current block.
1057  */
1058  if (size <= (size_t) lt->pos)
1059  {
1060  lt->pos -= (int) size;
1061  return size;
1062  }
1063 
1064  /*
1065  * Not-so-easy case, have to walk back the chain of blocks. This
1066  * implementation would be pretty inefficient for long seeks, but we
1067  * really aren't doing that (a seek over one tuple is typical).
1068  */
1069  seekpos = (size_t) lt->pos; /* part within this block */
1070  while (size > seekpos)
1071  {
1072  long prev = TapeBlockGetTrailer(lt->buffer)->prev;
1073 
1074  if (prev == -1L)
1075  {
1076  /* Tried to back up beyond the beginning of tape. */
1077  if (lt->curBlockNumber != lt->firstBlockNumber)
1078  elog(ERROR, "unexpected end of tape");
1079  lt->pos = 0;
1080  return seekpos;
1081  }
1082 
1083  ltsReadBlock(lts, prev, (void *) lt->buffer);
1084 
1085  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1086  elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1087  prev,
1088  TapeBlockGetTrailer(lt->buffer)->next,
1089  lt->curBlockNumber);
1090 
1092  lt->curBlockNumber = prev;
1093  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1094 
1095  seekpos += TapeBlockPayloadSize;
1096  }
1097 
1098  /*
1099  * 'seekpos' can now be greater than 'size', because it points to the
1100  * beginning the target block. The difference is the position within the
1101  * page.
1102  */
1103  lt->pos = seekpos - size;
1104  return size;
1105 }
1106 
1107 /*
1108  * Seek to an arbitrary position in a logical tape.
1109  *
1110  * *Only* a frozen-for-read tape can be seeked.
1111  *
1112  * Must be called with a block/offset previously returned by
1113  * LogicalTapeTell().
1114  */
1115 void
1117  long blocknum, int offset)
1118 {
1119  LogicalTape *lt;
1120 
1121  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1122  lt = &lts->tapes[tapenum];
1123  Assert(lt->frozen);
1124  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1125  Assert(lt->buffer_size == BLCKSZ);
1126 
1127  if (lt->buffer == NULL)
1128  ltsInitReadBuffer(lts, lt);
1129 
1130  if (blocknum != lt->curBlockNumber)
1131  {
1132  ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1133  lt->curBlockNumber = blocknum;
1135  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1136  }
1137 
1138  if (offset > lt->nbytes)
1139  elog(ERROR, "invalid tape seek position");
1140  lt->pos = offset;
1141 }
1142 
1143 /*
1144  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1145  *
1146  * NOTE: it'd be OK to do this during write phase with intention of using
1147  * the position for a seek after freezing. Not clear if anyone needs that.
1148  */
1149 void
1151  long *blocknum, int *offset)
1152 {
1153  LogicalTape *lt;
1154 
1155  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1156  lt = &lts->tapes[tapenum];
1157 
1158  if (lt->buffer == NULL)
1159  ltsInitReadBuffer(lts, lt);
1160 
1161  Assert(lt->offsetBlockNumber == 0L);
1162 
1163  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1164  Assert(lt->buffer_size == BLCKSZ);
1165 
1166  *blocknum = lt->curBlockNumber;
1167  *offset = lt->pos;
1168 }
1169 
1170 /*
1171  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1172  */
1173 long
1175 {
1176  return lts->nBlocksAllocated - lts->nHoleBlocks;
1177 }
int max_size
Definition: logtape.c:151
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:882
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define swap(a, b)
Definition: qsort.c:94
#define TapeBlockIsLast(buf)
Definition: logtape.c:106
#define TapeBlockPayloadSize
Definition: logtape.c:102
long offsetBlockNumber
Definition: logtape.c:144
long nBlocksWritten
Definition: logtape.c:177
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:280
int64 BufFileSize(BufFile *file)
Definition: buffile.c:785
static bool ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:278
#define Min(x, y)
Definition: c.h:920
static long ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:349
BufFile * pfile
Definition: logtape.c:164
#define MemSet(start, val, len)
Definition: c.h:971
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
Definition: logtape.c:404
long firstblocknumber
Definition: logtape.h:50
bool frozen
Definition: logtape.c:127
long nextBlockNumber
Definition: logtape.c:143
void BufFileClose(BufFile *file)
Definition: buffile.c:391
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:856
static void swap_nodes(long *heap, unsigned long a, unsigned long b)
Definition: logtape.c:317
bool writing
Definition: logtape.c:126
long * freeBlocks
Definition: logtape.c:189
bool dirty
Definition: logtape.c:128
char data[BLCKSZ]
Definition: c.h:1104
void pfree(void *pointer)
Definition: mcxt.c:1056
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:690
#define ERROR
Definition: elog.h:43
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:184
#define MAXPGPATH
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:601
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:258
size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Definition: logtape.c:1042
int errcode_for_file_access(void)
Definition: elog.c:633
long nHoleBlocks
Definition: logtape.c:178
int nbytes
Definition: logtape.c:153
static unsigned long right_offset(unsigned i)
Definition: logtape.c:333
void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset)
Definition: logtape.c:1150
void pg_itoa(int16 i, char *a)
Definition: numutils.c:337
static unsigned long left_offset(unsigned long i)
Definition: logtape.c:327
#define MaxAllocSize
Definition: memutils.h:40
static void ltsInitTape(LogicalTape *lt)
Definition: logtape.c:545
long firstBlockNumber
Definition: logtape.c:141
long nFreeBlocks
Definition: logtape.c:190
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:752
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:110
static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:568
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset)
Definition: logtape.c:461
#define ereport(elevel,...)
Definition: elog.h:144
long curBlockNumber
Definition: logtape.c:142
#define Assert(condition)
Definition: c.h:738
struct TapeBlockTrailer TapeBlockTrailer
size_t Size
Definition: c.h:466
void BufFileExportShared(BufFile *file)
Definition: buffile.c:373
LogicalTape * tapes
Definition: logtape.c:195
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:778
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:107
void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
Definition: logtape.c:939
char * buffer
Definition: logtape.c:149
static char * filename
Definition: pg_dumpall.c:90
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
bool forgetFreeSpace
Definition: logtape.c:188
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:528
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:575
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:652
void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset)
Definition: logtape.c:1116
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:679
Size freeBlocksLen
Definition: logtape.c:191
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1174
struct LogicalTape LogicalTape
static unsigned long parent_offset(unsigned long i)
Definition: logtape.c:339
long BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:824
long nBlocksAllocated
Definition: logtape.c:176
void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
Definition: logtape.c:1014
int buffer_size
Definition: logtape.c:150
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:214
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:103
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:262