PostgreSQL Source Code  git master
logtape.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  * Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted. (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass. For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner. Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated. In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file. logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c. Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape form a chain, with a prev- and next-
35  * pointer in each block.
36  *
37  * The initial write pass is guaranteed to fill the underlying file
38  * perfectly sequentially, no matter how data is divided into logical tapes.
39  * Once we begin merge passes, the access pattern becomes considerably
40  * less predictable --- but the seeking involved should be comparable to
41  * what would happen if we kept each logical tape in a separate file,
42  * so there's no serious performance penalty paid to obtain the space
43  * savings of recycling. We try to localize the write accesses by always
44  * writing to the lowest-numbered free block when we have a choice; it's
45  * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46  * policy for free blocks would be better?)
47  *
48  * To further make the I/Os more sequential, we can use a larger buffer
49  * when reading, and read multiple blocks from the same tape in one go,
50  * whenever the buffer becomes empty.
51  *
52  * To support the above policy of writing to the lowest free block,
53  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
54  * order each time it is asked for a block and the list isn't currently
55  * sorted. This is an efficient way to handle it because we expect cycles
56  * of releasing many blocks followed by re-using many blocks, due to
57  * the larger read buffer.
58  *
59  * Since all the bookkeeping and buffer memory is allocated with palloc(),
60  * and the underlying file(s) are made with OpenTemporaryFile, all resources
61  * for a logical tape set are certain to be cleaned up even if processing
62  * is aborted by ereport(ERROR). To avoid confusion, the caller should take
63  * care that all calls for a single LogicalTapeSet are made in the same
64  * palloc context.
65  *
66  * To support parallel sort operations involving coordinated callers to
67  * tuplesort.c routines across multiple workers, it is necessary to
68  * concatenate each worker BufFile/tapeset into one single logical tapeset
69  * managed by the leader. Workers should have produced one final
70  * materialized tape (their entire output) when this happens in leader.
71  * There will always be the same number of runs as input tapes, and the same
72  * number of input tapes as participants (worker Tuplesortstates).
73  *
74  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
75  * Portions Copyright (c) 1994, Regents of the University of California
76  *
77  * IDENTIFICATION
78  * src/backend/utils/sort/logtape.c
79  *
80  *-------------------------------------------------------------------------
81  */
82 
83 #include "postgres.h"
84 
85 #include "storage/buffile.h"
86 #include "utils/builtins.h"
87 #include "utils/logtape.h"
88 #include "utils/memdebug.h"
89 #include "utils/memutils.h"
90 
91 /*
92  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
93  *
94  * The first block of a tape has prev == -1. The last block of a tape
95  * stores the number of valid bytes on the block, inverted, in 'next'
96  * Therefore next < 0 indicates the last block.
97  */
98 typedef struct TapeBlockTrailer
99 {
100  long prev; /* previous block on this tape, or -1 on first
101  * block */
102  long next; /* next block on this tape, or # of valid
103  * bytes on last block (if < 0) */
105 
106 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
107 #define TapeBlockGetTrailer(buf) \
108  ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
109 
110 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
111 #define TapeBlockGetNBytes(buf) \
112  (TapeBlockIsLast(buf) ? \
113  (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
114 #define TapeBlockSetNBytes(buf, nbytes) \
115  (TapeBlockGetTrailer(buf)->next = -(nbytes))
116 
117 
118 /*
119  * This data structure represents a single "logical tape" within the set
120  * of logical tapes stored in the same file.
121  *
122  * While writing, we hold the current partially-written data block in the
123  * buffer. While reading, we can hold multiple blocks in the buffer. Note
124  * that we don't retain the trailers of a block when it's read into the
125  * buffer. The buffer therefore contains one large contiguous chunk of data
126  * from the tape.
127  */
128 typedef struct LogicalTape
129 {
130  bool writing; /* T while in write phase */
131  bool frozen; /* T if blocks should not be freed when read */
132  bool dirty; /* does buffer need to be written? */
133 
134  /*
135  * Block numbers of the first, current, and next block of the tape.
136  *
137  * The "current" block number is only valid when writing, or reading from
138  * a frozen tape. (When reading from an unfrozen tape, we use a larger
139  * read buffer that holds multiple blocks, so the "current" block is
140  * ambiguous.)
141  *
142  * When concatenation of worker tape BufFiles is performed, an offset to
143  * the first block in the unified BufFile space is applied during reads.
144  */
149 
150  /*
151  * Buffer for current data block(s).
152  */
153  char *buffer; /* physical buffer (separately palloc'd) */
154  int buffer_size; /* allocated size of the buffer */
155  int max_size; /* highest useful, safe buffer_size */
156  int pos; /* next read/write position in buffer */
157  int nbytes; /* total # of valid bytes in buffer */
158 } LogicalTape;
159 
160 /*
161  * This data structure represents a set of related "logical tapes" sharing
162  * space in a single underlying file. (But that "file" may be multiple files
163  * if needed to escape OS limits on file size; buffile.c handles that for us.)
164  * The number of tapes is fixed at creation.
165  */
167 {
168  BufFile *pfile; /* underlying file for whole tape set */
169 
170  /*
171  * File size tracking. nBlocksWritten is the size of the underlying file,
172  * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
173  * by ltsGetFreeBlock(), and it is always greater than or equal to
174  * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
175  * blocks that have been allocated for a tape, but have not been written
176  * to the underlying file yet. nHoleBlocks tracks the total number of
177  * blocks that are in unused holes between worker spaces following BufFile
178  * concatenation.
179  */
180  long nBlocksAllocated; /* # of blocks allocated */
181  long nBlocksWritten; /* # of blocks used in underlying file */
182  long nHoleBlocks; /* # of "hole" blocks left */
183 
184  /*
185  * We store the numbers of recycled-and-available blocks in freeBlocks[].
186  * When there are no such blocks, we extend the underlying file.
187  *
188  * If forgetFreeSpace is true then any freed blocks are simply forgotten
189  * rather than being remembered in freeBlocks[]. See notes for
190  * LogicalTapeSetForgetFreeSpace().
191  *
192  * If blocksSorted is true then the block numbers in freeBlocks are in
193  * *decreasing* order, so that removing the last entry gives us the lowest
194  * free block. We re-sort the blocks whenever a block is demanded; this
195  * should be reasonably efficient given the expected usage pattern.
196  */
197  bool forgetFreeSpace; /* are we remembering free blocks? */
198  bool blocksSorted; /* is freeBlocks[] currently in order? */
199  long *freeBlocks; /* resizable array */
200  int nFreeBlocks; /* # of currently free blocks */
201  int freeBlocksLen; /* current allocated length of freeBlocks[] */
202 
203  /* The array of logical tapes. */
204  int nTapes; /* # of logical tapes in set */
205  LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
206 };
207 
208 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
209 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
210 static long ltsGetFreeBlock(LogicalTapeSet *lts);
211 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
212 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
213  SharedFileSet *fileset);
214 
215 
216 /*
217  * Write a block-sized buffer to the specified block of the underlying file.
218  *
219  * No need for an error return convention; we ereport() on any error.
220  */
221 static void
222 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
223 {
224  /*
225  * BufFile does not support "holes", so if we're about to write a block
226  * that's past the current end of file, fill the space between the current
227  * end of file and the target block with zeros.
228  *
229  * This should happen rarely, otherwise you are not writing very
230  * sequentially. In current use, this only happens when the sort ends
231  * writing a run, and switches to another tape. The last block of the
232  * previous tape isn't flushed to disk until the end of the sort, so you
233  * get one-block hole, where the last block of the previous tape will
234  * later go.
235  *
236  * Note that BufFile concatenation can leave "holes" in BufFile between
237  * worker-owned block ranges. These are tracked for reporting purposes
238  * only. We never read from nor write to these hole blocks, and so they
239  * are not considered here.
240  */
241  while (blocknum > lts->nBlocksWritten)
242  {
243  char zerobuf[BLCKSZ];
244 
245  MemSet(zerobuf, 0, sizeof(zerobuf));
246 
247  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf);
248  }
249 
250  /* Write the requested block */
251  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
252  BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
253  ereport(ERROR,
255  errmsg("could not write block %ld of temporary file: %m",
256  blocknum)));
257 
258  /* Update nBlocksWritten, if we extended the file */
259  if (blocknum == lts->nBlocksWritten)
260  lts->nBlocksWritten++;
261 }
262 
263 /*
264  * Read a block-sized buffer from the specified block of the underlying file.
265  *
266  * No need for an error return convention; we ereport() on any error. This
267  * module should never attempt to read a block it doesn't know is there.
268  */
269 static void
270 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
271 {
272  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
273  BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
274  ereport(ERROR,
276  errmsg("could not read block %ld of temporary file: %m",
277  blocknum)));
278 }
279 
280 /*
281  * Read as many blocks as we can into the per-tape buffer.
282  *
283  * Returns true if anything was read, 'false' on EOF.
284  */
285 static bool
287 {
288  lt->pos = 0;
289  lt->nbytes = 0;
290 
291  do
292  {
293  char *thisbuf = lt->buffer + lt->nbytes;
294  long datablocknum = lt->nextBlockNumber;
295 
296  /* Fetch next block number */
297  if (datablocknum == -1L)
298  break; /* EOF */
299  /* Apply worker offset, needed for leader tapesets */
300  datablocknum += lt->offsetBlockNumber;
301 
302  /* Read the block */
303  ltsReadBlock(lts, datablocknum, (void *) thisbuf);
304  if (!lt->frozen)
305  ltsReleaseBlock(lts, datablocknum);
307 
308  lt->nbytes += TapeBlockGetNBytes(thisbuf);
309  if (TapeBlockIsLast(thisbuf))
310  {
311  lt->nextBlockNumber = -1L;
312  /* EOF */
313  break;
314  }
315  else
316  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
317 
318  /* Advance to next block, if we have buffer space left */
319  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
320 
321  return (lt->nbytes > 0);
322 }
323 
324 /*
325  * qsort comparator for sorting freeBlocks[] into decreasing order.
326  */
327 static int
328 freeBlocks_cmp(const void *a, const void *b)
329 {
330  long ablk = *((const long *) a);
331  long bblk = *((const long *) b);
332 
333  /* can't just subtract because long might be wider than int */
334  if (ablk < bblk)
335  return 1;
336  if (ablk > bblk)
337  return -1;
338  return 0;
339 }
340 
341 /*
342  * Select a currently unused block for writing to.
343  */
344 static long
346 {
347  /*
348  * If there are multiple free blocks, we select the one appearing last in
349  * freeBlocks[] (after sorting the array if needed). If there are none,
350  * assign the next block at the end of the file.
351  */
352  if (lts->nFreeBlocks > 0)
353  {
354  if (!lts->blocksSorted)
355  {
356  qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
357  sizeof(long), freeBlocks_cmp);
358  lts->blocksSorted = true;
359  }
360  return lts->freeBlocks[--lts->nFreeBlocks];
361  }
362  else
363  return lts->nBlocksAllocated++;
364 }
365 
366 /*
367  * Return a block# to the freelist.
368  */
369 static void
370 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
371 {
372  int ndx;
373 
374  /*
375  * Do nothing if we're no longer interested in remembering free space.
376  */
377  if (lts->forgetFreeSpace)
378  return;
379 
380  /*
381  * Enlarge freeBlocks array if full.
382  */
383  if (lts->nFreeBlocks >= lts->freeBlocksLen)
384  {
385  lts->freeBlocksLen *= 2;
386  lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
387  lts->freeBlocksLen * sizeof(long));
388  }
389 
390  /*
391  * Add blocknum to array, and mark the array unsorted if it's no longer in
392  * decreasing order.
393  */
394  ndx = lts->nFreeBlocks++;
395  lts->freeBlocks[ndx] = blocknum;
396  if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
397  lts->blocksSorted = false;
398 }
399 
400 /*
401  * Claim ownership of a set of logical tapes from existing shared BufFiles.
402  *
403  * Caller should be leader process. Though tapes are marked as frozen in
404  * workers, they are not frozen when opened within leader, since unfrozen tapes
405  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
406  * for random access.)
407  */
408 static void
410  SharedFileSet *fileset)
411 {
412  LogicalTape *lt = NULL;
413  long tapeblocks = 0L;
414  long nphysicalblocks = 0L;
415  int i;
416 
417  /* Should have at least one worker tape, plus leader's tape */
418  Assert(lts->nTapes >= 2);
419 
420  /*
421  * Build concatenated view of all BufFiles, remembering the block number
422  * where each source file begins. No changes are needed for leader/last
423  * tape.
424  */
425  for (i = 0; i < lts->nTapes - 1; i++)
426  {
427  char filename[MAXPGPATH];
428  BufFile *file;
429  off_t filesize;
430 
431  lt = &lts->tapes[i];
432 
433  pg_itoa(i, filename);
434  file = BufFileOpenShared(fileset, filename);
435  filesize = BufFileSize(file);
436  if (filesize < 0)
437  ereport(ERROR,
439  errmsg("could not determine size of temporary file \"%s\"", filename)));
440 
441  /*
442  * Stash first BufFile, and concatenate subsequent BufFiles to that.
443  * Store block offset into each tape as we go.
444  */
445  lt->firstBlockNumber = shared[i].firstblocknumber;
446  if (i == 0)
447  {
448  lts->pfile = file;
449  lt->offsetBlockNumber = 0L;
450  }
451  else
452  {
453  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
454  }
455  /* Don't allocate more for read buffer than could possibly help */
456  lt->max_size = Min(MaxAllocSize, filesize);
457  tapeblocks = filesize / BLCKSZ;
458  nphysicalblocks += tapeblocks;
459  }
460 
461  /*
462  * Set # of allocated blocks, as well as # blocks written. Use extent of
463  * new BufFile space (from 0 to end of last worker's tape space) for this.
464  * Allocated/written blocks should include space used by holes left
465  * between concatenated BufFiles.
466  */
467  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
468  lts->nBlocksWritten = lts->nBlocksAllocated;
469 
470  /*
471  * Compute number of hole blocks so that we can later work backwards, and
472  * instrument number of physical blocks. We don't simply use physical
473  * blocks directly for instrumentation because this would break if we ever
474  * subsequently wrote to worker tape.
475  *
476  * Working backwards like this keeps our options open. If shared BufFiles
477  * ever support being written to post-export, logtape.c can automatically
478  * take advantage of that. We'd then support writing to the leader tape
479  * while recycling space from worker tapes, because the leader tape has a
480  * zero offset (write routines won't need to have extra logic to apply an
481  * offset).
482  *
483  * The only thing that currently prevents writing to the leader tape from
484  * working is the fact that BufFiles opened using BufFileOpenShared() are
485  * read-only by definition, but that could be changed if it seemed
486  * worthwhile. For now, writing to the leader tape will raise a "Bad file
487  * descriptor" error, so tuplesort must avoid writing to the leader tape
488  * altogether.
489  */
490  lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
491 }
492 
493 /*
494  * Create a set of logical tapes in a temporary underlying file.
495  *
496  * Each tape is initialized in write state. Serial callers pass ntapes,
497  * NULL argument for shared, and -1 for worker. Parallel worker callers
498  * pass ntapes, a shared file handle, NULL shared argument, and their own
499  * worker number. Leader callers, which claim shared worker tapes here,
500  * must supply non-sentinel values for all arguments except worker number,
501  * which should be -1.
502  *
503  * Leader caller is passing back an array of metadata each worker captured
504  * when LogicalTapeFreeze() was called for their final result tapes. Passed
505  * tapes array is actually sized ntapes - 1, because it includes only
506  * worker tapes, whereas leader requires its own leader tape. Note that we
507  * rely on the assumption that reclaimed worker tapes will only be read
508  * from once by leader, and never written to again (tapes are initialized
509  * for writing, but that's only to be consistent). Leader may not write to
510  * its own tape purely due to a restriction in the shared buffile
511  * infrastructure that may be lifted in the future.
512  */
514 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
515  int worker)
516 {
517  LogicalTapeSet *lts;
518  LogicalTape *lt;
519  int i;
520 
521  /*
522  * Create top-level struct including per-tape LogicalTape structs.
523  */
524  Assert(ntapes > 0);
525  lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
526  ntapes * sizeof(LogicalTape));
527  lts->nBlocksAllocated = 0L;
528  lts->nBlocksWritten = 0L;
529  lts->nHoleBlocks = 0L;
530  lts->forgetFreeSpace = false;
531  lts->blocksSorted = true; /* a zero-length array is sorted ... */
532  lts->freeBlocksLen = 32; /* reasonable initial guess */
533  lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
534  lts->nFreeBlocks = 0;
535  lts->nTapes = ntapes;
536 
537  /*
538  * Initialize per-tape structs. Note we allocate the I/O buffer and the
539  * first block for a tape only when it is first actually written to. This
540  * avoids wasting memory space when tuplesort.c overestimates the number
541  * of tapes needed.
542  */
543  for (i = 0; i < ntapes; i++)
544  {
545  lt = &lts->tapes[i];
546  lt->writing = true;
547  lt->frozen = false;
548  lt->dirty = false;
549  lt->firstBlockNumber = -1L;
550  lt->curBlockNumber = -1L;
551  lt->nextBlockNumber = -1L;
552  lt->offsetBlockNumber = 0L;
553  lt->buffer = NULL;
554  lt->buffer_size = 0;
555  /* palloc() larger than MaxAllocSize would fail */
556  lt->max_size = MaxAllocSize;
557  lt->pos = 0;
558  lt->nbytes = 0;
559  }
560 
561  /*
562  * Create temp BufFile storage as required.
563  *
564  * Leader concatenates worker tapes, which requires special adjustment to
565  * final tapeset data. Things are simpler for the worker case and the
566  * serial case, though. They are generally very similar -- workers use a
567  * shared fileset, whereas serial sorts use a conventional serial BufFile.
568  */
569  if (shared)
570  ltsConcatWorkerTapes(lts, shared, fileset);
571  else if (fileset)
572  {
573  char filename[MAXPGPATH];
574 
575  pg_itoa(worker, filename);
576  lts->pfile = BufFileCreateShared(fileset, filename);
577  }
578  else
579  lts->pfile = BufFileCreateTemp(false);
580 
581  return lts;
582 }
583 
584 /*
585  * Close a logical tape set and release all resources.
586  */
587 void
589 {
590  LogicalTape *lt;
591  int i;
592 
593  BufFileClose(lts->pfile);
594  for (i = 0; i < lts->nTapes; i++)
595  {
596  lt = &lts->tapes[i];
597  if (lt->buffer)
598  pfree(lt->buffer);
599  }
600  pfree(lts->freeBlocks);
601  pfree(lts);
602 }
603 
604 /*
605  * Mark a logical tape set as not needing management of free space anymore.
606  *
607  * This should be called if the caller does not intend to write any more data
608  * into the tape set, but is reading from un-frozen tapes. Since no more
609  * writes are planned, remembering free blocks is no longer useful. Setting
610  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
611  * is not designed to handle large numbers of free blocks.
612  */
613 void
615 {
616  lts->forgetFreeSpace = true;
617 }
618 
619 /*
620  * Write to a logical tape.
621  *
622  * There are no error returns; we ereport() on failure.
623  */
624 void
626  void *ptr, size_t size)
627 {
628  LogicalTape *lt;
629  size_t nthistime;
630 
631  Assert(tapenum >= 0 && tapenum < lts->nTapes);
632  lt = &lts->tapes[tapenum];
633  Assert(lt->writing);
634  Assert(lt->offsetBlockNumber == 0L);
635 
636  /* Allocate data buffer and first block on first write */
637  if (lt->buffer == NULL)
638  {
639  lt->buffer = (char *) palloc(BLCKSZ);
640  lt->buffer_size = BLCKSZ;
641  }
642  if (lt->curBlockNumber == -1)
643  {
644  Assert(lt->firstBlockNumber == -1);
645  Assert(lt->pos == 0);
646 
647  lt->curBlockNumber = ltsGetFreeBlock(lts);
649 
650  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
651  }
652 
653  Assert(lt->buffer_size == BLCKSZ);
654  while (size > 0)
655  {
656  if (lt->pos >= TapeBlockPayloadSize)
657  {
658  /* Buffer full, dump it out */
659  long nextBlockNumber;
660 
661  if (!lt->dirty)
662  {
663  /* Hmm, went directly from reading to writing? */
664  elog(ERROR, "invalid logtape state: should be dirty");
665  }
666 
667  /*
668  * First allocate the next block, so that we can store it in the
669  * 'next' pointer of this block.
670  */
671  nextBlockNumber = ltsGetFreeBlock(lts);
672 
673  /* set the next-pointer and dump the current block. */
674  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
675  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
676 
677  /* initialize the prev-pointer of the next block */
678  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
679  lt->curBlockNumber = nextBlockNumber;
680  lt->pos = 0;
681  lt->nbytes = 0;
682  }
683 
684  nthistime = TapeBlockPayloadSize - lt->pos;
685  if (nthistime > size)
686  nthistime = size;
687  Assert(nthistime > 0);
688 
689  memcpy(lt->buffer + lt->pos, ptr, nthistime);
690 
691  lt->dirty = true;
692  lt->pos += nthistime;
693  if (lt->nbytes < lt->pos)
694  lt->nbytes = lt->pos;
695  ptr = (void *) ((char *) ptr + nthistime);
696  size -= nthistime;
697  }
698 }
699 
700 /*
701  * Rewind logical tape and switch from writing to reading.
702  *
703  * The tape must currently be in writing state, or "frozen" in read state.
704  *
705  * 'buffer_size' specifies how much memory to use for the read buffer.
706  * Regardless of the argument, the actual amount of memory used is between
707  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
708  * rounded down and truncated to fit those constraints, if necessary. If the
709  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
710  * byte buffer is used.
711  */
712 void
713 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
714 {
715  LogicalTape *lt;
716 
717  Assert(tapenum >= 0 && tapenum < lts->nTapes);
718  lt = &lts->tapes[tapenum];
719 
720  /*
721  * Round and cap buffer_size if needed.
722  */
723  if (lt->frozen)
724  buffer_size = BLCKSZ;
725  else
726  {
727  /* need at least one block */
728  if (buffer_size < BLCKSZ)
729  buffer_size = BLCKSZ;
730 
731  /* palloc() larger than max_size is unlikely to be helpful */
732  if (buffer_size > lt->max_size)
733  buffer_size = lt->max_size;
734 
735  /* round down to BLCKSZ boundary */
736  buffer_size -= buffer_size % BLCKSZ;
737  }
738 
739  if (lt->writing)
740  {
741  /*
742  * Completion of a write phase. Flush last partial data block, and
743  * rewind for normal (destructive) read.
744  */
745  if (lt->dirty)
746  {
747  /*
748  * As long as we've filled the buffer at least once, its contents
749  * are entirely defined from valgrind's point of view, even though
750  * contents beyond the current end point may be stale. But it's
751  * possible - at least in the case of a parallel sort - to sort
752  * such small amount of data that we do not fill the buffer even
753  * once. Tell valgrind that its contents are defined, so it
754  * doesn't bleat.
755  */
757  lt->buffer_size - lt->nbytes);
758 
759  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
760  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
761  }
762  lt->writing = false;
763  }
764  else
765  {
766  /*
767  * This is only OK if tape is frozen; we rewind for (another) read
768  * pass.
769  */
770  Assert(lt->frozen);
771  }
772 
773  /* Allocate a read buffer (unless the tape is empty) */
774  if (lt->buffer)
775  pfree(lt->buffer);
776  lt->buffer = NULL;
777  lt->buffer_size = 0;
778  if (lt->firstBlockNumber != -1L)
779  {
780  lt->buffer = palloc(buffer_size);
781  lt->buffer_size = buffer_size;
782  }
783 
784  /* Read the first block, or reset if tape is empty */
786  lt->pos = 0;
787  lt->nbytes = 0;
788  ltsReadFillBuffer(lts, lt);
789 }
790 
791 /*
792  * Rewind logical tape and switch from reading to writing.
793  *
794  * NOTE: we assume the caller has read the tape to the end; otherwise
795  * untouched data will not have been freed. We could add more code to free
796  * any unread blocks, but in current usage of this module it'd be useless
797  * code.
798  */
799 void
801 {
802  LogicalTape *lt;
803 
804  Assert(tapenum >= 0 && tapenum < lts->nTapes);
805  lt = &lts->tapes[tapenum];
806 
807  Assert(!lt->writing && !lt->frozen);
808  lt->writing = true;
809  lt->dirty = false;
810  lt->firstBlockNumber = -1L;
811  lt->curBlockNumber = -1L;
812  lt->pos = 0;
813  lt->nbytes = 0;
814  if (lt->buffer)
815  pfree(lt->buffer);
816  lt->buffer = NULL;
817  lt->buffer_size = 0;
818 }
819 
820 /*
821  * Read from a logical tape.
822  *
823  * Early EOF is indicated by return value less than #bytes requested.
824  */
825 size_t
826 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
827  void *ptr, size_t size)
828 {
829  LogicalTape *lt;
830  size_t nread = 0;
831  size_t nthistime;
832 
833  Assert(tapenum >= 0 && tapenum < lts->nTapes);
834  lt = &lts->tapes[tapenum];
835  Assert(!lt->writing);
836 
837  while (size > 0)
838  {
839  if (lt->pos >= lt->nbytes)
840  {
841  /* Try to load more data into buffer. */
842  if (!ltsReadFillBuffer(lts, lt))
843  break; /* EOF */
844  }
845 
846  nthistime = lt->nbytes - lt->pos;
847  if (nthistime > size)
848  nthistime = size;
849  Assert(nthistime > 0);
850 
851  memcpy(ptr, lt->buffer + lt->pos, nthistime);
852 
853  lt->pos += nthistime;
854  ptr = (void *) ((char *) ptr + nthistime);
855  size -= nthistime;
856  nread += nthistime;
857  }
858 
859  return nread;
860 }
861 
862 /*
863  * "Freeze" the contents of a tape so that it can be read multiple times
864  * and/or read backwards. Once a tape is frozen, its contents will not
865  * be released until the LogicalTapeSet is destroyed. This is expected
866  * to be used only for the final output pass of a merge.
867  *
868  * This *must* be called just at the end of a write pass, before the
869  * tape is rewound (after rewind is too late!). It performs a rewind
870  * and switch to read mode "for free". An immediately following rewind-
871  * for-read call is OK but not necessary.
872  *
873  * share output argument is set with details of storage used for tape after
874  * freezing, which may be passed to LogicalTapeSetCreate within leader
875  * process later. This metadata is only of interest to worker callers
876  * freezing their final output for leader (single materialized tape).
877  * Serial sorts should set share to NULL.
878  */
879 void
880 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
881 {
882  LogicalTape *lt;
883 
884  Assert(tapenum >= 0 && tapenum < lts->nTapes);
885  lt = &lts->tapes[tapenum];
886  Assert(lt->writing);
887  Assert(lt->offsetBlockNumber == 0L);
888 
889  /*
890  * Completion of a write phase. Flush last partial data block, and rewind
891  * for nondestructive read.
892  */
893  if (lt->dirty)
894  {
895  /*
896  * As long as we've filled the buffer at least once, its contents are
897  * entirely defined from valgrind's point of view, even though
898  * contents beyond the current end point may be stale. But it's
899  * possible - at least in the case of a parallel sort - to sort such
900  * small amount of data that we do not fill the buffer even once. Tell
901  * valgrind that its contents are defined, so it doesn't bleat.
902  */
904  lt->buffer_size - lt->nbytes);
905 
906  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
907  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
908  lt->writing = false;
909  }
910  lt->writing = false;
911  lt->frozen = true;
912 
913  /*
914  * The seek and backspace functions assume a single block read buffer.
915  * That's OK with current usage. A larger buffer is helpful to make the
916  * read pattern of the backing file look more sequential to the OS, when
917  * we're reading from multiple tapes. But at the end of a sort, when a
918  * tape is frozen, we only read from a single tape anyway.
919  */
920  if (!lt->buffer || lt->buffer_size != BLCKSZ)
921  {
922  if (lt->buffer)
923  pfree(lt->buffer);
924  lt->buffer = palloc(BLCKSZ);
925  lt->buffer_size = BLCKSZ;
926  }
927 
928  /* Read the first block, or reset if tape is empty */
930  lt->pos = 0;
931  lt->nbytes = 0;
932 
933  if (lt->firstBlockNumber == -1L)
934  lt->nextBlockNumber = -1L;
935  ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
936  if (TapeBlockIsLast(lt->buffer))
937  lt->nextBlockNumber = -1L;
938  else
939  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
940  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
941 
942  /* Handle extra steps when caller is to share its tapeset */
943  if (share)
944  {
946  share->firstblocknumber = lt->firstBlockNumber;
947  }
948 }
949 
950 /*
951  * Backspace the tape a given number of bytes. (We also support a more
952  * general seek interface, see below.)
953  *
954  * *Only* a frozen-for-read tape can be backed up; we don't support
955  * random access during write, and an unfrozen read tape may have
956  * already discarded the desired data!
957  *
958  * Returns the number of bytes backed up. It can be less than the
959  * requested amount, if there isn't that much data before the current
960  * position. The tape is positioned to the beginning of the tape in
961  * that case.
962  */
963 size_t
964 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
965 {
966  LogicalTape *lt;
967  size_t seekpos = 0;
968 
969  Assert(tapenum >= 0 && tapenum < lts->nTapes);
970  lt = &lts->tapes[tapenum];
971  Assert(lt->frozen);
972  Assert(lt->buffer_size == BLCKSZ);
973 
974  /*
975  * Easy case for seek within current block.
976  */
977  if (size <= (size_t) lt->pos)
978  {
979  lt->pos -= (int) size;
980  return size;
981  }
982 
983  /*
984  * Not-so-easy case, have to walk back the chain of blocks. This
985  * implementation would be pretty inefficient for long seeks, but we
986  * really aren't doing that (a seek over one tuple is typical).
987  */
988  seekpos = (size_t) lt->pos; /* part within this block */
989  while (size > seekpos)
990  {
991  long prev = TapeBlockGetTrailer(lt->buffer)->prev;
992 
993  if (prev == -1L)
994  {
995  /* Tried to back up beyond the beginning of tape. */
996  if (lt->curBlockNumber != lt->firstBlockNumber)
997  elog(ERROR, "unexpected end of tape");
998  lt->pos = 0;
999  return seekpos;
1000  }
1001 
1002  ltsReadBlock(lts, prev, (void *) lt->buffer);
1003 
1004  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1005  elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1006  prev,
1007  TapeBlockGetTrailer(lt->buffer)->next,
1008  lt->curBlockNumber);
1009 
1011  lt->curBlockNumber = prev;
1012  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1013 
1014  seekpos += TapeBlockPayloadSize;
1015  }
1016 
1017  /*
1018  * 'seekpos' can now be greater than 'size', because it points to the
1019  * beginning the target block. The difference is the position within the
1020  * page.
1021  */
1022  lt->pos = seekpos - size;
1023  return size;
1024 }
1025 
1026 /*
1027  * Seek to an arbitrary position in a logical tape.
1028  *
1029  * *Only* a frozen-for-read tape can be seeked.
1030  *
1031  * Must be called with a block/offset previously returned by
1032  * LogicalTapeTell().
1033  */
1034 void
1036  long blocknum, int offset)
1037 {
1038  LogicalTape *lt;
1039 
1040  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1041  lt = &lts->tapes[tapenum];
1042  Assert(lt->frozen);
1043  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1044  Assert(lt->buffer_size == BLCKSZ);
1045 
1046  if (blocknum != lt->curBlockNumber)
1047  {
1048  ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1049  lt->curBlockNumber = blocknum;
1051  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1052  }
1053 
1054  if (offset > lt->nbytes)
1055  elog(ERROR, "invalid tape seek position");
1056  lt->pos = offset;
1057 }
1058 
1059 /*
1060  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1061  *
1062  * NOTE: it'd be OK to do this during write phase with intention of using
1063  * the position for a seek after freezing. Not clear if anyone needs that.
1064  */
1065 void
1067  long *blocknum, int *offset)
1068 {
1069  LogicalTape *lt;
1070 
1071  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1072  lt = &lts->tapes[tapenum];
1073  Assert(lt->offsetBlockNumber == 0L);
1074 
1075  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1076  Assert(lt->buffer_size == BLCKSZ);
1077 
1078  *blocknum = lt->curBlockNumber;
1079  *offset = lt->pos;
1080 }
1081 
1082 /*
1083  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1084  */
1085 long
1087 {
1088  return lts->nBlocksAllocated - lts->nHoleBlocks;
1089 }
int max_size
Definition: logtape.c:155
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:826
LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: logtape.c:205
int nFreeBlocks
Definition: logtape.c:200
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define TapeBlockIsLast(buf)
Definition: logtape.c:110
#define TapeBlockPayloadSize
Definition: logtape.c:106
long offsetBlockNumber
Definition: logtape.c:148
long nBlocksWritten
Definition: logtape.c:181
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:278
int freeBlocksLen
Definition: logtape.c:201
static bool ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:286
#define Min(x, y)
Definition: c.h:857
static long ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:345
BufFile * pfile
Definition: logtape.c:168
#define MemSet(start, val, len)
Definition: c.h:908
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
Definition: logtape.c:370
long firstblocknumber
Definition: logtape.h:50
bool frozen
Definition: logtape.c:131
long nextBlockNumber
Definition: logtape.c:147
void BufFileClose(BufFile *file)
Definition: buffile.c:388
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:800
bool writing
Definition: logtape.c:130
long * freeBlocks
Definition: logtape.c:199
bool dirty
Definition: logtape.c:132
void pfree(void *pointer)
Definition: mcxt.c:1031
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:625
#define ERROR
Definition: elog.h:43
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:193
#define MAXPGPATH
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:514
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:256
size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Definition: logtape.c:964
int errcode_for_file_access(void)
Definition: elog.c:598
long nHoleBlocks
Definition: logtape.c:182
int nbytes
Definition: logtape.c:157
void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset)
Definition: logtape.c:1066
static int freeBlocks_cmp(const void *a, const void *b)
Definition: logtape.c:328
#define ereport(elevel, rest)
Definition: elog.h:122
void pg_itoa(int16 i, char *a)
Definition: numutils.c:120
#define MaxAllocSize
Definition: memutils.h:40
long firstBlockNumber
Definition: logtape.c:145
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:769
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:114
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset)
Definition: logtape.c:409
long curBlockNumber
Definition: logtape.c:146
#define Assert(condition)
Definition: c.h:699
struct TapeBlockTrailer TapeBlockTrailer
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
void BufFileExportShared(BufFile *file)
Definition: buffile.c:370
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1044
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:713
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:111
void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
Definition: logtape.c:880
char * buffer
Definition: logtape.c:153
static char * filename
Definition: pg_dumpall.c:87
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool forgetFreeSpace
Definition: logtape.c:197
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:545
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:592
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:588
void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset)
Definition: logtape.c:1035
#define elog
Definition: elog.h:219
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:614
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1086
#define qsort(a, b, c, d)
Definition: port.h:421
struct LogicalTape LogicalTape
long BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:836
bool blocksSorted
Definition: logtape.c:198
long nBlocksAllocated
Definition: logtape.c:180
off_t BufFileSize(BufFile *file)
Definition: buffile.c:802
int buffer_size
Definition: logtape.c:154
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:222
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:107
#define offsetof(type, field)
Definition: c.h:622
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:270