PostgreSQL Source Code  git master
logtape.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  * Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted. (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass. For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner. Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated. In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file. logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c. Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape form a chain, with a prev- and next-
35  * pointer in each block.
36  *
37  * The initial write pass is guaranteed to fill the underlying file
38  * perfectly sequentially, no matter how data is divided into logical tapes.
39  * Once we begin merge passes, the access pattern becomes considerably
40  * less predictable --- but the seeking involved should be comparable to
41  * what would happen if we kept each logical tape in a separate file,
42  * so there's no serious performance penalty paid to obtain the space
43  * savings of recycling. We try to localize the write accesses by always
44  * writing to the lowest-numbered free block when we have a choice; it's
45  * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46  * policy for free blocks would be better?)
47  *
48  * To further make the I/Os more sequential, we can use a larger buffer
49  * when reading, and read multiple blocks from the same tape in one go,
50  * whenever the buffer becomes empty.
51  *
52  * To support the above policy of writing to the lowest free block,
53  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
54  * order each time it is asked for a block and the list isn't currently
55  * sorted. This is an efficient way to handle it because we expect cycles
56  * of releasing many blocks followed by re-using many blocks, due to
57  * the larger read buffer.
58  *
59  * Since all the bookkeeping and buffer memory is allocated with palloc(),
60  * and the underlying file(s) are made with OpenTemporaryFile, all resources
61  * for a logical tape set are certain to be cleaned up even if processing
62  * is aborted by ereport(ERROR). To avoid confusion, the caller should take
63  * care that all calls for a single LogicalTapeSet are made in the same
64  * palloc context.
65  *
66  * To support parallel sort operations involving coordinated callers to
67  * tuplesort.c routines across multiple workers, it is necessary to
68  * concatenate each worker BufFile/tapeset into one single logical tapeset
69  * managed by the leader. Workers should have produced one final
70  * materialized tape (their entire output) when this happens in leader.
71  * There will always be the same number of runs as input tapes, and the same
72  * number of input tapes as participants (worker Tuplesortstates).
73  *
74  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
75  * Portions Copyright (c) 1994, Regents of the University of California
76  *
77  * IDENTIFICATION
78  * src/backend/utils/sort/logtape.c
79  *
80  *-------------------------------------------------------------------------
81  */
82 
83 #include "postgres.h"
84 
85 #include "storage/buffile.h"
86 #include "utils/builtins.h"
87 #include "utils/logtape.h"
88 #include "utils/memdebug.h"
89 #include "utils/memutils.h"
90 
91 /*
92  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
93  *
94  * The first block of a tape has prev == -1. The last block of a tape
95  * stores the number of valid bytes on the block, inverted, in 'next'
96  * Therefore next < 0 indicates the last block.
97  */
98 typedef struct TapeBlockTrailer
99 {
100  long prev; /* previous block on this tape, or -1 on first
101  * block */
102  long next; /* next block on this tape, or # of valid
103  * bytes on last block (if < 0) */
105 
106 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
107 #define TapeBlockGetTrailer(buf) \
108  ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
109 
110 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
111 #define TapeBlockGetNBytes(buf) \
112  (TapeBlockIsLast(buf) ? \
113  (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
114 #define TapeBlockSetNBytes(buf, nbytes) \
115  (TapeBlockGetTrailer(buf)->next = -(nbytes))
116 
117 
118 /*
119  * This data structure represents a single "logical tape" within the set
120  * of logical tapes stored in the same file.
121  *
122  * While writing, we hold the current partially-written data block in the
123  * buffer. While reading, we can hold multiple blocks in the buffer. Note
124  * that we don't retain the trailers of a block when it's read into the
125  * buffer. The buffer therefore contains one large contiguous chunk of data
126  * from the tape.
127  */
128 typedef struct LogicalTape
129 {
130  bool writing; /* T while in write phase */
131  bool frozen; /* T if blocks should not be freed when read */
132  bool dirty; /* does buffer need to be written? */
133 
134  /*
135  * Block numbers of the first, current, and next block of the tape.
136  *
137  * The "current" block number is only valid when writing, or reading from
138  * a frozen tape. (When reading from an unfrozen tape, we use a larger
139  * read buffer that holds multiple blocks, so the "current" block is
140  * ambiguous.)
141  *
142  * When concatenation of worker tape BufFiles is performed, an offset to
143  * the first block in the unified BufFile space is applied during reads.
144  */
149 
150  /*
151  * Buffer for current data block(s).
152  */
153  char *buffer; /* physical buffer (separately palloc'd) */
154  int buffer_size; /* allocated size of the buffer */
155  int max_size; /* highest useful, safe buffer_size */
156  int pos; /* next read/write position in buffer */
157  int nbytes; /* total # of valid bytes in buffer */
158 } LogicalTape;
159 
160 /*
161  * This data structure represents a set of related "logical tapes" sharing
162  * space in a single underlying file. (But that "file" may be multiple files
163  * if needed to escape OS limits on file size; buffile.c handles that for us.)
164  * The number of tapes is fixed at creation.
165  */
167 {
168  BufFile *pfile; /* underlying file for whole tape set */
169 
170  /*
171  * File size tracking. nBlocksWritten is the size of the underlying file,
172  * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
173  * by ltsGetFreeBlock(), and it is always greater than or equal to
174  * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
175  * blocks that have been allocated for a tape, but have not been written
176  * to the underlying file yet. nHoleBlocks tracks the total number of
177  * blocks that are in unused holes between worker spaces following BufFile
178  * concatenation.
179  */
180  long nBlocksAllocated; /* # of blocks allocated */
181  long nBlocksWritten; /* # of blocks used in underlying file */
182  long nHoleBlocks; /* # of "hole" blocks left */
183 
184  /*
185  * We store the numbers of recycled-and-available blocks in freeBlocks[].
186  * When there are no such blocks, we extend the underlying file.
187  *
188  * If forgetFreeSpace is true then any freed blocks are simply forgotten
189  * rather than being remembered in freeBlocks[]. See notes for
190  * LogicalTapeSetForgetFreeSpace().
191  *
192  * If blocksSorted is true then the block numbers in freeBlocks are in
193  * *decreasing* order, so that removing the last entry gives us the lowest
194  * free block. We re-sort the blocks whenever a block is demanded; this
195  * should be reasonably efficient given the expected usage pattern.
196  */
197  bool forgetFreeSpace; /* are we remembering free blocks? */
198  bool blocksSorted; /* is freeBlocks[] currently in order? */
199  long *freeBlocks; /* resizable array */
200  int nFreeBlocks; /* # of currently free blocks */
201  int freeBlocksLen; /* current allocated length of freeBlocks[] */
202 
203  /* The array of logical tapes. */
204  int nTapes; /* # of logical tapes in set */
205  LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
206 };
207 
208 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
209 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
210 static long ltsGetFreeBlock(LogicalTapeSet *lts);
211 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
212 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
213  SharedFileSet *fileset);
214 
215 
216 /*
217  * Write a block-sized buffer to the specified block of the underlying file.
218  *
219  * No need for an error return convention; we ereport() on any error.
220  */
221 static void
222 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
223 {
224  /*
225  * BufFile does not support "holes", so if we're about to write a block
226  * that's past the current end of file, fill the space between the current
227  * end of file and the target block with zeros.
228  *
229  * This should happen rarely, otherwise you are not writing very
230  * sequentially. In current use, this only happens when the sort ends
231  * writing a run, and switches to another tape. The last block of the
232  * previous tape isn't flushed to disk until the end of the sort, so you
233  * get one-block hole, where the last block of the previous tape will
234  * later go.
235  *
236  * Note that BufFile concatenation can leave "holes" in BufFile between
237  * worker-owned block ranges. These are tracked for reporting purposes
238  * only. We never read from nor write to these hole blocks, and so they
239  * are not considered here.
240  */
241  while (blocknum > lts->nBlocksWritten)
242  {
243  PGAlignedBlock zerobuf;
244 
245  MemSet(zerobuf.data, 0, sizeof(zerobuf));
246 
247  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
248  }
249 
250  /* Write the requested block */
251  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
252  BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
253  ereport(ERROR,
255  errmsg("could not write block %ld of temporary file: %m",
256  blocknum)));
257 
258  /* Update nBlocksWritten, if we extended the file */
259  if (blocknum == lts->nBlocksWritten)
260  lts->nBlocksWritten++;
261 }
262 
263 /*
264  * Read a block-sized buffer from the specified block of the underlying file.
265  *
266  * No need for an error return convention; we ereport() on any error. This
267  * module should never attempt to read a block it doesn't know is there.
268  */
269 static void
270 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
271 {
272  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
273  BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
274  ereport(ERROR,
276  errmsg("could not read block %ld of temporary file: %m",
277  blocknum)));
278 }
279 
280 /*
281  * Read as many blocks as we can into the per-tape buffer.
282  *
283  * Returns true if anything was read, 'false' on EOF.
284  */
285 static bool
287 {
288  lt->pos = 0;
289  lt->nbytes = 0;
290 
291  do
292  {
293  char *thisbuf = lt->buffer + lt->nbytes;
294  long datablocknum = lt->nextBlockNumber;
295 
296  /* Fetch next block number */
297  if (datablocknum == -1L)
298  break; /* EOF */
299  /* Apply worker offset, needed for leader tapesets */
300  datablocknum += lt->offsetBlockNumber;
301 
302  /* Read the block */
303  ltsReadBlock(lts, datablocknum, (void *) thisbuf);
304  if (!lt->frozen)
305  ltsReleaseBlock(lts, datablocknum);
307 
308  lt->nbytes += TapeBlockGetNBytes(thisbuf);
309  if (TapeBlockIsLast(thisbuf))
310  {
311  lt->nextBlockNumber = -1L;
312  /* EOF */
313  break;
314  }
315  else
316  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
317 
318  /* Advance to next block, if we have buffer space left */
319  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
320 
321  return (lt->nbytes > 0);
322 }
323 
324 /*
325  * qsort comparator for sorting freeBlocks[] into decreasing order.
326  */
327 static int
328 freeBlocks_cmp(const void *a, const void *b)
329 {
330  long ablk = *((const long *) a);
331  long bblk = *((const long *) b);
332 
333  /* can't just subtract because long might be wider than int */
334  if (ablk < bblk)
335  return 1;
336  if (ablk > bblk)
337  return -1;
338  return 0;
339 }
340 
341 /*
342  * Select a currently unused block for writing to.
343  */
344 static long
346 {
347  /*
348  * If there are multiple free blocks, we select the one appearing last in
349  * freeBlocks[] (after sorting the array if needed). If there are none,
350  * assign the next block at the end of the file.
351  */
352  if (lts->nFreeBlocks > 0)
353  {
354  if (!lts->blocksSorted)
355  {
356  qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
357  sizeof(long), freeBlocks_cmp);
358  lts->blocksSorted = true;
359  }
360  return lts->freeBlocks[--lts->nFreeBlocks];
361  }
362  else
363  return lts->nBlocksAllocated++;
364 }
365 
366 /*
367  * Return a block# to the freelist.
368  */
369 static void
370 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
371 {
372  int ndx;
373 
374  /*
375  * Do nothing if we're no longer interested in remembering free space.
376  */
377  if (lts->forgetFreeSpace)
378  return;
379 
380  /*
381  * Enlarge freeBlocks array if full.
382  */
383  if (lts->nFreeBlocks >= lts->freeBlocksLen)
384  {
385  lts->freeBlocksLen *= 2;
386  lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
387  lts->freeBlocksLen * sizeof(long));
388  }
389 
390  /*
391  * Add blocknum to array, and mark the array unsorted if it's no longer in
392  * decreasing order.
393  */
394  ndx = lts->nFreeBlocks++;
395  lts->freeBlocks[ndx] = blocknum;
396  if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
397  lts->blocksSorted = false;
398 }
399 
400 /*
401  * Claim ownership of a set of logical tapes from existing shared BufFiles.
402  *
403  * Caller should be leader process. Though tapes are marked as frozen in
404  * workers, they are not frozen when opened within leader, since unfrozen tapes
405  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
406  * for random access.)
407  */
408 static void
410  SharedFileSet *fileset)
411 {
412  LogicalTape *lt = NULL;
413  long tapeblocks = 0L;
414  long nphysicalblocks = 0L;
415  int i;
416 
417  /* Should have at least one worker tape, plus leader's tape */
418  Assert(lts->nTapes >= 2);
419 
420  /*
421  * Build concatenated view of all BufFiles, remembering the block number
422  * where each source file begins. No changes are needed for leader/last
423  * tape.
424  */
425  for (i = 0; i < lts->nTapes - 1; i++)
426  {
427  char filename[MAXPGPATH];
428  BufFile *file;
429  int64 filesize;
430 
431  lt = &lts->tapes[i];
432 
433  pg_itoa(i, filename);
434  file = BufFileOpenShared(fileset, filename);
435  filesize = BufFileSize(file);
436 
437  /*
438  * Stash first BufFile, and concatenate subsequent BufFiles to that.
439  * Store block offset into each tape as we go.
440  */
441  lt->firstBlockNumber = shared[i].firstblocknumber;
442  if (i == 0)
443  {
444  lts->pfile = file;
445  lt->offsetBlockNumber = 0L;
446  }
447  else
448  {
449  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
450  }
451  /* Don't allocate more for read buffer than could possibly help */
452  lt->max_size = Min(MaxAllocSize, filesize);
453  tapeblocks = filesize / BLCKSZ;
454  nphysicalblocks += tapeblocks;
455  }
456 
457  /*
458  * Set # of allocated blocks, as well as # blocks written. Use extent of
459  * new BufFile space (from 0 to end of last worker's tape space) for this.
460  * Allocated/written blocks should include space used by holes left
461  * between concatenated BufFiles.
462  */
463  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
464  lts->nBlocksWritten = lts->nBlocksAllocated;
465 
466  /*
467  * Compute number of hole blocks so that we can later work backwards, and
468  * instrument number of physical blocks. We don't simply use physical
469  * blocks directly for instrumentation because this would break if we ever
470  * subsequently wrote to the leader tape.
471  *
472  * Working backwards like this keeps our options open. If shared BufFiles
473  * ever support being written to post-export, logtape.c can automatically
474  * take advantage of that. We'd then support writing to the leader tape
475  * while recycling space from worker tapes, because the leader tape has a
476  * zero offset (write routines won't need to have extra logic to apply an
477  * offset).
478  *
479  * The only thing that currently prevents writing to the leader tape from
480  * working is the fact that BufFiles opened using BufFileOpenShared() are
481  * read-only by definition, but that could be changed if it seemed
482  * worthwhile. For now, writing to the leader tape will raise a "Bad file
483  * descriptor" error, so tuplesort must avoid writing to the leader tape
484  * altogether.
485  */
486  lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
487 }
488 
489 /*
490  * Create a set of logical tapes in a temporary underlying file.
491  *
492  * Each tape is initialized in write state. Serial callers pass ntapes,
493  * NULL argument for shared, and -1 for worker. Parallel worker callers
494  * pass ntapes, a shared file handle, NULL shared argument, and their own
495  * worker number. Leader callers, which claim shared worker tapes here,
496  * must supply non-sentinel values for all arguments except worker number,
497  * which should be -1.
498  *
499  * Leader caller is passing back an array of metadata each worker captured
500  * when LogicalTapeFreeze() was called for their final result tapes. Passed
501  * tapes array is actually sized ntapes - 1, because it includes only
502  * worker tapes, whereas leader requires its own leader tape. Note that we
503  * rely on the assumption that reclaimed worker tapes will only be read
504  * from once by leader, and never written to again (tapes are initialized
505  * for writing, but that's only to be consistent). Leader may not write to
506  * its own tape purely due to a restriction in the shared buffile
507  * infrastructure that may be lifted in the future.
508  */
510 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
511  int worker)
512 {
513  LogicalTapeSet *lts;
514  LogicalTape *lt;
515  int i;
516 
517  /*
518  * Create top-level struct including per-tape LogicalTape structs.
519  */
520  Assert(ntapes > 0);
521  lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
522  ntapes * sizeof(LogicalTape));
523  lts->nBlocksAllocated = 0L;
524  lts->nBlocksWritten = 0L;
525  lts->nHoleBlocks = 0L;
526  lts->forgetFreeSpace = false;
527  lts->blocksSorted = true; /* a zero-length array is sorted ... */
528  lts->freeBlocksLen = 32; /* reasonable initial guess */
529  lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
530  lts->nFreeBlocks = 0;
531  lts->nTapes = ntapes;
532 
533  /*
534  * Initialize per-tape structs. Note we allocate the I/O buffer and the
535  * first block for a tape only when it is first actually written to. This
536  * avoids wasting memory space when tuplesort.c overestimates the number
537  * of tapes needed.
538  */
539  for (i = 0; i < ntapes; i++)
540  {
541  lt = &lts->tapes[i];
542  lt->writing = true;
543  lt->frozen = false;
544  lt->dirty = false;
545  lt->firstBlockNumber = -1L;
546  lt->curBlockNumber = -1L;
547  lt->nextBlockNumber = -1L;
548  lt->offsetBlockNumber = 0L;
549  lt->buffer = NULL;
550  lt->buffer_size = 0;
551  /* palloc() larger than MaxAllocSize would fail */
552  lt->max_size = MaxAllocSize;
553  lt->pos = 0;
554  lt->nbytes = 0;
555  }
556 
557  /*
558  * Create temp BufFile storage as required.
559  *
560  * Leader concatenates worker tapes, which requires special adjustment to
561  * final tapeset data. Things are simpler for the worker case and the
562  * serial case, though. They are generally very similar -- workers use a
563  * shared fileset, whereas serial sorts use a conventional serial BufFile.
564  */
565  if (shared)
566  ltsConcatWorkerTapes(lts, shared, fileset);
567  else if (fileset)
568  {
569  char filename[MAXPGPATH];
570 
571  pg_itoa(worker, filename);
572  lts->pfile = BufFileCreateShared(fileset, filename);
573  }
574  else
575  lts->pfile = BufFileCreateTemp(false);
576 
577  return lts;
578 }
579 
580 /*
581  * Close a logical tape set and release all resources.
582  */
583 void
585 {
586  LogicalTape *lt;
587  int i;
588 
589  BufFileClose(lts->pfile);
590  for (i = 0; i < lts->nTapes; i++)
591  {
592  lt = &lts->tapes[i];
593  if (lt->buffer)
594  pfree(lt->buffer);
595  }
596  pfree(lts->freeBlocks);
597  pfree(lts);
598 }
599 
600 /*
601  * Mark a logical tape set as not needing management of free space anymore.
602  *
603  * This should be called if the caller does not intend to write any more data
604  * into the tape set, but is reading from un-frozen tapes. Since no more
605  * writes are planned, remembering free blocks is no longer useful. Setting
606  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
607  * is not designed to handle large numbers of free blocks.
608  */
609 void
611 {
612  lts->forgetFreeSpace = true;
613 }
614 
615 /*
616  * Write to a logical tape.
617  *
618  * There are no error returns; we ereport() on failure.
619  */
620 void
622  void *ptr, size_t size)
623 {
624  LogicalTape *lt;
625  size_t nthistime;
626 
627  Assert(tapenum >= 0 && tapenum < lts->nTapes);
628  lt = &lts->tapes[tapenum];
629  Assert(lt->writing);
630  Assert(lt->offsetBlockNumber == 0L);
631 
632  /* Allocate data buffer and first block on first write */
633  if (lt->buffer == NULL)
634  {
635  lt->buffer = (char *) palloc(BLCKSZ);
636  lt->buffer_size = BLCKSZ;
637  }
638  if (lt->curBlockNumber == -1)
639  {
640  Assert(lt->firstBlockNumber == -1);
641  Assert(lt->pos == 0);
642 
643  lt->curBlockNumber = ltsGetFreeBlock(lts);
645 
646  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
647  }
648 
649  Assert(lt->buffer_size == BLCKSZ);
650  while (size > 0)
651  {
652  if (lt->pos >= TapeBlockPayloadSize)
653  {
654  /* Buffer full, dump it out */
655  long nextBlockNumber;
656 
657  if (!lt->dirty)
658  {
659  /* Hmm, went directly from reading to writing? */
660  elog(ERROR, "invalid logtape state: should be dirty");
661  }
662 
663  /*
664  * First allocate the next block, so that we can store it in the
665  * 'next' pointer of this block.
666  */
667  nextBlockNumber = ltsGetFreeBlock(lts);
668 
669  /* set the next-pointer and dump the current block. */
670  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
671  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
672 
673  /* initialize the prev-pointer of the next block */
674  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
675  lt->curBlockNumber = nextBlockNumber;
676  lt->pos = 0;
677  lt->nbytes = 0;
678  }
679 
680  nthistime = TapeBlockPayloadSize - lt->pos;
681  if (nthistime > size)
682  nthistime = size;
683  Assert(nthistime > 0);
684 
685  memcpy(lt->buffer + lt->pos, ptr, nthistime);
686 
687  lt->dirty = true;
688  lt->pos += nthistime;
689  if (lt->nbytes < lt->pos)
690  lt->nbytes = lt->pos;
691  ptr = (void *) ((char *) ptr + nthistime);
692  size -= nthistime;
693  }
694 }
695 
696 /*
697  * Rewind logical tape and switch from writing to reading.
698  *
699  * The tape must currently be in writing state, or "frozen" in read state.
700  *
701  * 'buffer_size' specifies how much memory to use for the read buffer.
702  * Regardless of the argument, the actual amount of memory used is between
703  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
704  * rounded down and truncated to fit those constraints, if necessary. If the
705  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
706  * byte buffer is used.
707  */
708 void
709 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
710 {
711  LogicalTape *lt;
712 
713  Assert(tapenum >= 0 && tapenum < lts->nTapes);
714  lt = &lts->tapes[tapenum];
715 
716  /*
717  * Round and cap buffer_size if needed.
718  */
719  if (lt->frozen)
720  buffer_size = BLCKSZ;
721  else
722  {
723  /* need at least one block */
724  if (buffer_size < BLCKSZ)
725  buffer_size = BLCKSZ;
726 
727  /* palloc() larger than max_size is unlikely to be helpful */
728  if (buffer_size > lt->max_size)
729  buffer_size = lt->max_size;
730 
731  /* round down to BLCKSZ boundary */
732  buffer_size -= buffer_size % BLCKSZ;
733  }
734 
735  if (lt->writing)
736  {
737  /*
738  * Completion of a write phase. Flush last partial data block, and
739  * rewind for normal (destructive) read.
740  */
741  if (lt->dirty)
742  {
743  /*
744  * As long as we've filled the buffer at least once, its contents
745  * are entirely defined from valgrind's point of view, even though
746  * contents beyond the current end point may be stale. But it's
747  * possible - at least in the case of a parallel sort - to sort
748  * such small amount of data that we do not fill the buffer even
749  * once. Tell valgrind that its contents are defined, so it
750  * doesn't bleat.
751  */
753  lt->buffer_size - lt->nbytes);
754 
755  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
756  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
757  }
758  lt->writing = false;
759  }
760  else
761  {
762  /*
763  * This is only OK if tape is frozen; we rewind for (another) read
764  * pass.
765  */
766  Assert(lt->frozen);
767  }
768 
769  /* Allocate a read buffer (unless the tape is empty) */
770  if (lt->buffer)
771  pfree(lt->buffer);
772  lt->buffer = NULL;
773  lt->buffer_size = 0;
774  if (lt->firstBlockNumber != -1L)
775  {
776  lt->buffer = palloc(buffer_size);
777  lt->buffer_size = buffer_size;
778  }
779 
780  /* Read the first block, or reset if tape is empty */
782  lt->pos = 0;
783  lt->nbytes = 0;
784  ltsReadFillBuffer(lts, lt);
785 }
786 
787 /*
788  * Rewind logical tape and switch from reading to writing.
789  *
790  * NOTE: we assume the caller has read the tape to the end; otherwise
791  * untouched data will not have been freed. We could add more code to free
792  * any unread blocks, but in current usage of this module it'd be useless
793  * code.
794  */
795 void
797 {
798  LogicalTape *lt;
799 
800  Assert(tapenum >= 0 && tapenum < lts->nTapes);
801  lt = &lts->tapes[tapenum];
802 
803  Assert(!lt->writing && !lt->frozen);
804  lt->writing = true;
805  lt->dirty = false;
806  lt->firstBlockNumber = -1L;
807  lt->curBlockNumber = -1L;
808  lt->pos = 0;
809  lt->nbytes = 0;
810  if (lt->buffer)
811  pfree(lt->buffer);
812  lt->buffer = NULL;
813  lt->buffer_size = 0;
814 }
815 
816 /*
817  * Read from a logical tape.
818  *
819  * Early EOF is indicated by return value less than #bytes requested.
820  */
821 size_t
822 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
823  void *ptr, size_t size)
824 {
825  LogicalTape *lt;
826  size_t nread = 0;
827  size_t nthistime;
828 
829  Assert(tapenum >= 0 && tapenum < lts->nTapes);
830  lt = &lts->tapes[tapenum];
831  Assert(!lt->writing);
832 
833  while (size > 0)
834  {
835  if (lt->pos >= lt->nbytes)
836  {
837  /* Try to load more data into buffer. */
838  if (!ltsReadFillBuffer(lts, lt))
839  break; /* EOF */
840  }
841 
842  nthistime = lt->nbytes - lt->pos;
843  if (nthistime > size)
844  nthistime = size;
845  Assert(nthistime > 0);
846 
847  memcpy(ptr, lt->buffer + lt->pos, nthistime);
848 
849  lt->pos += nthistime;
850  ptr = (void *) ((char *) ptr + nthistime);
851  size -= nthistime;
852  nread += nthistime;
853  }
854 
855  return nread;
856 }
857 
858 /*
859  * "Freeze" the contents of a tape so that it can be read multiple times
860  * and/or read backwards. Once a tape is frozen, its contents will not
861  * be released until the LogicalTapeSet is destroyed. This is expected
862  * to be used only for the final output pass of a merge.
863  *
864  * This *must* be called just at the end of a write pass, before the
865  * tape is rewound (after rewind is too late!). It performs a rewind
866  * and switch to read mode "for free". An immediately following rewind-
867  * for-read call is OK but not necessary.
868  *
869  * share output argument is set with details of storage used for tape after
870  * freezing, which may be passed to LogicalTapeSetCreate within leader
871  * process later. This metadata is only of interest to worker callers
872  * freezing their final output for leader (single materialized tape).
873  * Serial sorts should set share to NULL.
874  */
875 void
876 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
877 {
878  LogicalTape *lt;
879 
880  Assert(tapenum >= 0 && tapenum < lts->nTapes);
881  lt = &lts->tapes[tapenum];
882  Assert(lt->writing);
883  Assert(lt->offsetBlockNumber == 0L);
884 
885  /*
886  * Completion of a write phase. Flush last partial data block, and rewind
887  * for nondestructive read.
888  */
889  if (lt->dirty)
890  {
891  /*
892  * As long as we've filled the buffer at least once, its contents are
893  * entirely defined from valgrind's point of view, even though
894  * contents beyond the current end point may be stale. But it's
895  * possible - at least in the case of a parallel sort - to sort such
896  * small amount of data that we do not fill the buffer even once. Tell
897  * valgrind that its contents are defined, so it doesn't bleat.
898  */
900  lt->buffer_size - lt->nbytes);
901 
902  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
903  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
904  lt->writing = false;
905  }
906  lt->writing = false;
907  lt->frozen = true;
908 
909  /*
910  * The seek and backspace functions assume a single block read buffer.
911  * That's OK with current usage. A larger buffer is helpful to make the
912  * read pattern of the backing file look more sequential to the OS, when
913  * we're reading from multiple tapes. But at the end of a sort, when a
914  * tape is frozen, we only read from a single tape anyway.
915  */
916  if (!lt->buffer || lt->buffer_size != BLCKSZ)
917  {
918  if (lt->buffer)
919  pfree(lt->buffer);
920  lt->buffer = palloc(BLCKSZ);
921  lt->buffer_size = BLCKSZ;
922  }
923 
924  /* Read the first block, or reset if tape is empty */
926  lt->pos = 0;
927  lt->nbytes = 0;
928 
929  if (lt->firstBlockNumber == -1L)
930  lt->nextBlockNumber = -1L;
931  ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
932  if (TapeBlockIsLast(lt->buffer))
933  lt->nextBlockNumber = -1L;
934  else
935  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
936  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
937 
938  /* Handle extra steps when caller is to share its tapeset */
939  if (share)
940  {
942  share->firstblocknumber = lt->firstBlockNumber;
943  }
944 }
945 
946 /*
947  * Backspace the tape a given number of bytes. (We also support a more
948  * general seek interface, see below.)
949  *
950  * *Only* a frozen-for-read tape can be backed up; we don't support
951  * random access during write, and an unfrozen read tape may have
952  * already discarded the desired data!
953  *
954  * Returns the number of bytes backed up. It can be less than the
955  * requested amount, if there isn't that much data before the current
956  * position. The tape is positioned to the beginning of the tape in
957  * that case.
958  */
959 size_t
960 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
961 {
962  LogicalTape *lt;
963  size_t seekpos = 0;
964 
965  Assert(tapenum >= 0 && tapenum < lts->nTapes);
966  lt = &lts->tapes[tapenum];
967  Assert(lt->frozen);
968  Assert(lt->buffer_size == BLCKSZ);
969 
970  /*
971  * Easy case for seek within current block.
972  */
973  if (size <= (size_t) lt->pos)
974  {
975  lt->pos -= (int) size;
976  return size;
977  }
978 
979  /*
980  * Not-so-easy case, have to walk back the chain of blocks. This
981  * implementation would be pretty inefficient for long seeks, but we
982  * really aren't doing that (a seek over one tuple is typical).
983  */
984  seekpos = (size_t) lt->pos; /* part within this block */
985  while (size > seekpos)
986  {
987  long prev = TapeBlockGetTrailer(lt->buffer)->prev;
988 
989  if (prev == -1L)
990  {
991  /* Tried to back up beyond the beginning of tape. */
992  if (lt->curBlockNumber != lt->firstBlockNumber)
993  elog(ERROR, "unexpected end of tape");
994  lt->pos = 0;
995  return seekpos;
996  }
997 
998  ltsReadBlock(lts, prev, (void *) lt->buffer);
999 
1000  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1001  elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1002  prev,
1003  TapeBlockGetTrailer(lt->buffer)->next,
1004  lt->curBlockNumber);
1005 
1007  lt->curBlockNumber = prev;
1008  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1009 
1010  seekpos += TapeBlockPayloadSize;
1011  }
1012 
1013  /*
1014  * 'seekpos' can now be greater than 'size', because it points to the
1015  * beginning the target block. The difference is the position within the
1016  * page.
1017  */
1018  lt->pos = seekpos - size;
1019  return size;
1020 }
1021 
1022 /*
1023  * Seek to an arbitrary position in a logical tape.
1024  *
1025  * *Only* a frozen-for-read tape can be seeked.
1026  *
1027  * Must be called with a block/offset previously returned by
1028  * LogicalTapeTell().
1029  */
1030 void
1032  long blocknum, int offset)
1033 {
1034  LogicalTape *lt;
1035 
1036  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1037  lt = &lts->tapes[tapenum];
1038  Assert(lt->frozen);
1039  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1040  Assert(lt->buffer_size == BLCKSZ);
1041 
1042  if (blocknum != lt->curBlockNumber)
1043  {
1044  ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1045  lt->curBlockNumber = blocknum;
1047  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1048  }
1049 
1050  if (offset > lt->nbytes)
1051  elog(ERROR, "invalid tape seek position");
1052  lt->pos = offset;
1053 }
1054 
1055 /*
1056  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1057  *
1058  * NOTE: it'd be OK to do this during write phase with intention of using
1059  * the position for a seek after freezing. Not clear if anyone needs that.
1060  */
1061 void
1063  long *blocknum, int *offset)
1064 {
1065  LogicalTape *lt;
1066 
1067  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1068  lt = &lts->tapes[tapenum];
1069  Assert(lt->offsetBlockNumber == 0L);
1070 
1071  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1072  Assert(lt->buffer_size == BLCKSZ);
1073 
1074  *blocknum = lt->curBlockNumber;
1075  *offset = lt->pos;
1076 }
1077 
1078 /*
1079  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1080  */
1081 long
1083 {
1084  return lts->nBlocksAllocated - lts->nHoleBlocks;
1085 }
int max_size
Definition: logtape.c:155
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:822
LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: logtape.c:205
int nFreeBlocks
Definition: logtape.c:200
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define TapeBlockIsLast(buf)
Definition: logtape.c:110
#define TapeBlockPayloadSize
Definition: logtape.c:106
long offsetBlockNumber
Definition: logtape.c:148
long nBlocksWritten
Definition: logtape.c:181
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:280
int freeBlocksLen
Definition: logtape.c:201
int64 BufFileSize(BufFile *file)
Definition: buffile.c:785
static bool ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:286
#define Min(x, y)
Definition: c.h:911
static long ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:345
BufFile * pfile
Definition: logtape.c:168
#define MemSet(start, val, len)
Definition: c.h:962
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
Definition: logtape.c:370
long firstblocknumber
Definition: logtape.h:50
bool frozen
Definition: logtape.c:131
long nextBlockNumber
Definition: logtape.c:147
void BufFileClose(BufFile *file)
Definition: buffile.c:391
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:796
bool writing
Definition: logtape.c:130
long * freeBlocks
Definition: logtape.c:199
bool dirty
Definition: logtape.c:132
char data[BLCKSZ]
Definition: c.h:1091
void pfree(void *pointer)
Definition: mcxt.c:1056
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:621
#define ERROR
Definition: elog.h:43
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:184
#define MAXPGPATH
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:510
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:258
size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Definition: logtape.c:960
int errcode_for_file_access(void)
Definition: elog.c:631
long nHoleBlocks
Definition: logtape.c:182
int nbytes
Definition: logtape.c:157
void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset)
Definition: logtape.c:1062
static int freeBlocks_cmp(const void *a, const void *b)
Definition: logtape.c:328
#define ereport(elevel, rest)
Definition: elog.h:141
void pg_itoa(int16 i, char *a)
Definition: numutils.c:273
#define MaxAllocSize
Definition: memutils.h:40
long firstBlockNumber
Definition: logtape.c:145
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:752
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:114
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset)
Definition: logtape.c:409
long curBlockNumber
Definition: logtape.c:146
#define Assert(condition)
Definition: c.h:739
struct TapeBlockTrailer TapeBlockTrailer
void BufFileExportShared(BufFile *file)
Definition: buffile.c:373
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:709
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:111
void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
Definition: logtape.c:876
char * buffer
Definition: logtape.c:153
static char * filename
Definition: pg_dumpall.c:90
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
bool forgetFreeSpace
Definition: logtape.c:197
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:528
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:575
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:584
void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset)
Definition: logtape.c:1031
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:610
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1082
#define qsort(a, b, c, d)
Definition: port.h:488
struct LogicalTape LogicalTape
long BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:824
bool blocksSorted
Definition: logtape.c:198
long nBlocksAllocated
Definition: logtape.c:180
int buffer_size
Definition: logtape.c:154
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:222
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:107
#define offsetof(type, field)
Definition: c.h:662
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:270