PostgreSQL Source Code  git master
logtape.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  * Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted. (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass. For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner. Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated. In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file. logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c. Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks. Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).
34  * The blocks in each logical tape form a chain, with a prev- and next-
35  * pointer in each block.
36  *
37  * The initial write pass is guaranteed to fill the underlying file
38  * perfectly sequentially, no matter how data is divided into logical tapes.
39  * Once we begin merge passes, the access pattern becomes considerably
40  * less predictable --- but the seeking involved should be comparable to
41  * what would happen if we kept each logical tape in a separate file,
42  * so there's no serious performance penalty paid to obtain the space
43  * savings of recycling. We try to localize the write accesses by always
44  * writing to the lowest-numbered free block when we have a choice; it's
45  * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
46  * policy for free blocks would be better?)
47  *
48  * To further make the I/Os more sequential, we can use a larger buffer
49  * when reading, and read multiple blocks from the same tape in one go,
50  * whenever the buffer becomes empty. LogicalTapeAssignReadBufferSize()
51  * can be used to set the size of the read buffer.
52  *
53  * To support the above policy of writing to the lowest free block,
54  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
55  * order each time it is asked for a block and the list isn't currently
56  * sorted. This is an efficient way to handle it because we expect cycles
57  * of releasing many blocks followed by re-using many blocks, due to
58  * the larger read buffer.
59  *
60  * Since all the bookkeeping and buffer memory is allocated with palloc(),
61  * and the underlying file(s) are made with OpenTemporaryFile, all resources
62  * for a logical tape set are certain to be cleaned up even if processing
63  * is aborted by ereport(ERROR). To avoid confusion, the caller should take
64  * care that all calls for a single LogicalTapeSet are made in the same
65  * palloc context.
66  *
67  * To support parallel sort operations involving coordinated callers to
68  * tuplesort.c routines across multiple workers, it is necessary to
69  * concatenate each worker BufFile/tapeset into one single logical tapeset
70  * managed by the leader. Workers should have produced one final
71  * materialized tape (their entire output) when this happens in leader.
72  * There will always be the same number of runs as input tapes, and the same
73  * number of input tapes as participants (worker Tuplesortstates).
74  *
75  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
76  * Portions Copyright (c) 1994, Regents of the University of California
77  *
78  * IDENTIFICATION
79  * src/backend/utils/sort/logtape.c
80  *
81  *-------------------------------------------------------------------------
82  */
83 
84 #include "postgres.h"
85 
86 #include "storage/buffile.h"
87 #include "utils/builtins.h"
88 #include "utils/logtape.h"
89 #include "utils/memdebug.h"
90 #include "utils/memutils.h"
91 
92 /*
93  * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
94  *
95  * The first block of a tape has prev == -1. The last block of a tape
96  * stores the number of valid bytes on the block, inverted, in 'next'
97  * Therefore next < 0 indicates the last block.
98  */
99 typedef struct TapeBlockTrailer
100 {
101  long prev; /* previous block on this tape, or -1 on first
102  * block */
103  long next; /* next block on this tape, or # of valid
104  * bytes on last block (if < 0) */
106 
107 #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
108 #define TapeBlockGetTrailer(buf) \
109  ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
110 
111 #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
112 #define TapeBlockGetNBytes(buf) \
113  (TapeBlockIsLast(buf) ? \
114  (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
115 #define TapeBlockSetNBytes(buf, nbytes) \
116  (TapeBlockGetTrailer(buf)->next = -(nbytes))
117 
118 
119 /*
120  * This data structure represents a single "logical tape" within the set
121  * of logical tapes stored in the same file.
122  *
123  * While writing, we hold the current partially-written data block in the
124  * buffer. While reading, we can hold multiple blocks in the buffer. Note
125  * that we don't retain the trailers of a block when it's read into the
126  * buffer. The buffer therefore contains one large contiguous chunk of data
127  * from the tape.
128  */
129 typedef struct LogicalTape
130 {
131  bool writing; /* T while in write phase */
132  bool frozen; /* T if blocks should not be freed when read */
133  bool dirty; /* does buffer need to be written? */
134 
135  /*
136  * Block numbers of the first, current, and next block of the tape.
137  *
138  * The "current" block number is only valid when writing, or reading from
139  * a frozen tape. (When reading from an unfrozen tape, we use a larger
140  * read buffer that holds multiple blocks, so the "current" block is
141  * ambiguous.)
142  *
143  * When concatenation of worker tape BufFiles is performed, an offset to
144  * the first block in the unified BufFile space is applied during reads.
145  */
150 
151  /*
152  * Buffer for current data block(s).
153  */
154  char *buffer; /* physical buffer (separately palloc'd) */
155  int buffer_size; /* allocated size of the buffer */
156  int max_size; /* highest useful, safe buffer_size */
157  int pos; /* next read/write position in buffer */
158  int nbytes; /* total # of valid bytes in buffer */
159 } LogicalTape;
160 
161 /*
162  * This data structure represents a set of related "logical tapes" sharing
163  * space in a single underlying file. (But that "file" may be multiple files
164  * if needed to escape OS limits on file size; buffile.c handles that for us.)
165  * The number of tapes is fixed at creation.
166  */
168 {
169  BufFile *pfile; /* underlying file for whole tape set */
170 
171  /*
172  * File size tracking. nBlocksWritten is the size of the underlying file,
173  * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
174  * by ltsGetFreeBlock(), and it is always greater than or equal to
175  * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
176  * blocks that have been allocated for a tape, but have not been written
177  * to the underlying file yet. nHoleBlocks tracks the total number of
178  * blocks that are in unused holes between worker spaces following BufFile
179  * concatenation.
180  */
181  long nBlocksAllocated; /* # of blocks allocated */
182  long nBlocksWritten; /* # of blocks used in underlying file */
183  long nHoleBlocks; /* # of "hole" blocks left */
184 
185  /*
186  * We store the numbers of recycled-and-available blocks in freeBlocks[].
187  * When there are no such blocks, we extend the underlying file.
188  *
189  * If forgetFreeSpace is true then any freed blocks are simply forgotten
190  * rather than being remembered in freeBlocks[]. See notes for
191  * LogicalTapeSetForgetFreeSpace().
192  *
193  * If blocksSorted is true then the block numbers in freeBlocks are in
194  * *decreasing* order, so that removing the last entry gives us the lowest
195  * free block. We re-sort the blocks whenever a block is demanded; this
196  * should be reasonably efficient given the expected usage pattern.
197  */
198  bool forgetFreeSpace; /* are we remembering free blocks? */
199  bool blocksSorted; /* is freeBlocks[] currently in order? */
200  long *freeBlocks; /* resizable array */
201  int nFreeBlocks; /* # of currently free blocks */
202  int freeBlocksLen; /* current allocated length of freeBlocks[] */
203 
204  /* The array of logical tapes. */
205  int nTapes; /* # of logical tapes in set */
206  LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */
207 };
208 
209 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
210 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
211 static long ltsGetFreeBlock(LogicalTapeSet *lts);
212 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
213 static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
214  SharedFileSet *fileset);
215 
216 
217 /*
218  * Write a block-sized buffer to the specified block of the underlying file.
219  *
220  * No need for an error return convention; we ereport() on any error.
221  */
222 static void
223 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
224 {
225  /*
226  * BufFile does not support "holes", so if we're about to write a block
227  * that's past the current end of file, fill the space between the current
228  * end of file and the target block with zeros.
229  *
230  * This should happen rarely, otherwise you are not writing very
231  * sequentially. In current use, this only happens when the sort ends
232  * writing a run, and switches to another tape. The last block of the
233  * previous tape isn't flushed to disk until the end of the sort, so you
234  * get one-block hole, where the last block of the previous tape will
235  * later go.
236  *
237  * Note that BufFile concatenation can leave "holes" in BufFile between
238  * worker-owned block ranges. These are tracked for reporting purposes
239  * only. We never read from nor write to these hole blocks, and so they
240  * are not considered here.
241  */
242  while (blocknum > lts->nBlocksWritten)
243  {
244  char zerobuf[BLCKSZ];
245 
246  MemSet(zerobuf, 0, sizeof(zerobuf));
247 
248  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf);
249  }
250 
251  /* Write the requested block */
252  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
253  BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
254  ereport(ERROR,
256  errmsg("could not write block %ld of temporary file: %m",
257  blocknum)));
258 
259  /* Update nBlocksWritten, if we extended the file */
260  if (blocknum == lts->nBlocksWritten)
261  lts->nBlocksWritten++;
262 }
263 
264 /*
265  * Read a block-sized buffer from the specified block of the underlying file.
266  *
267  * No need for an error return convention; we ereport() on any error. This
268  * module should never attempt to read a block it doesn't know is there.
269  */
270 static void
271 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
272 {
273  if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
274  BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
275  ereport(ERROR,
277  errmsg("could not read block %ld of temporary file: %m",
278  blocknum)));
279 }
280 
281 /*
282  * Read as many blocks as we can into the per-tape buffer.
283  *
284  * Returns true if anything was read, 'false' on EOF.
285  */
286 static bool
288 {
289  lt->pos = 0;
290  lt->nbytes = 0;
291 
292  do
293  {
294  char *thisbuf = lt->buffer + lt->nbytes;
295  long datablocknum = lt->nextBlockNumber;
296 
297  /* Fetch next block number */
298  if (datablocknum == -1L)
299  break; /* EOF */
300  /* Apply worker offset, needed for leader tapesets */
301  datablocknum += lt->offsetBlockNumber;
302 
303  /* Read the block */
304  ltsReadBlock(lts, datablocknum, (void *) thisbuf);
305  if (!lt->frozen)
306  ltsReleaseBlock(lts, datablocknum);
308 
309  lt->nbytes += TapeBlockGetNBytes(thisbuf);
310  if (TapeBlockIsLast(thisbuf))
311  {
312  lt->nextBlockNumber = -1L;
313  /* EOF */
314  break;
315  }
316  else
317  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
318 
319  /* Advance to next block, if we have buffer space left */
320  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
321 
322  return (lt->nbytes > 0);
323 }
324 
325 /*
326  * qsort comparator for sorting freeBlocks[] into decreasing order.
327  */
328 static int
329 freeBlocks_cmp(const void *a, const void *b)
330 {
331  long ablk = *((const long *) a);
332  long bblk = *((const long *) b);
333 
334  /* can't just subtract because long might be wider than int */
335  if (ablk < bblk)
336  return 1;
337  if (ablk > bblk)
338  return -1;
339  return 0;
340 }
341 
342 /*
343  * Select a currently unused block for writing to.
344  */
345 static long
347 {
348  /*
349  * If there are multiple free blocks, we select the one appearing last in
350  * freeBlocks[] (after sorting the array if needed). If there are none,
351  * assign the next block at the end of the file.
352  */
353  if (lts->nFreeBlocks > 0)
354  {
355  if (!lts->blocksSorted)
356  {
357  qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
358  sizeof(long), freeBlocks_cmp);
359  lts->blocksSorted = true;
360  }
361  return lts->freeBlocks[--lts->nFreeBlocks];
362  }
363  else
364  return lts->nBlocksAllocated++;
365 }
366 
367 /*
368  * Return a block# to the freelist.
369  */
370 static void
371 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
372 {
373  int ndx;
374 
375  /*
376  * Do nothing if we're no longer interested in remembering free space.
377  */
378  if (lts->forgetFreeSpace)
379  return;
380 
381  /*
382  * Enlarge freeBlocks array if full.
383  */
384  if (lts->nFreeBlocks >= lts->freeBlocksLen)
385  {
386  lts->freeBlocksLen *= 2;
387  lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
388  lts->freeBlocksLen * sizeof(long));
389  }
390 
391  /*
392  * Add blocknum to array, and mark the array unsorted if it's no longer in
393  * decreasing order.
394  */
395  ndx = lts->nFreeBlocks++;
396  lts->freeBlocks[ndx] = blocknum;
397  if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
398  lts->blocksSorted = false;
399 }
400 
401 /*
402  * Claim ownership of a set of logical tapes from existing shared BufFiles.
403  *
404  * Caller should be leader process. Though tapes are marked as frozen in
405  * workers, they are not frozen when opened within leader, since unfrozen tapes
406  * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
407  * for random access.)
408  */
409 static void
411  SharedFileSet *fileset)
412 {
413  LogicalTape *lt = NULL;
414  long tapeblocks = 0L;
415  long nphysicalblocks = 0L;
416  int i;
417 
418  /* Should have at least one worker tape, plus leader's tape */
419  Assert(lts->nTapes >= 2);
420 
421  /*
422  * Build concatenated view of all BufFiles, remembering the block number
423  * where each source file begins. No changes are needed for leader/last
424  * tape.
425  */
426  for (i = 0; i < lts->nTapes - 1; i++)
427  {
428  char filename[MAXPGPATH];
429  BufFile *file;
430 
431  lt = &lts->tapes[i];
432 
433  pg_itoa(i, filename);
434  file = BufFileOpenShared(fileset, filename);
435 
436  /*
437  * Stash first BufFile, and concatenate subsequent BufFiles to that.
438  * Store block offset into each tape as we go.
439  */
440  lt->firstBlockNumber = shared[i].firstblocknumber;
441  if (i == 0)
442  {
443  lts->pfile = file;
444  lt->offsetBlockNumber = 0L;
445  }
446  else
447  {
448  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
449  }
450  /* Don't allocate more for read buffer than could possibly help */
451  lt->max_size = Min(MaxAllocSize, shared[i].buffilesize);
452  tapeblocks = shared[i].buffilesize / BLCKSZ;
453  nphysicalblocks += tapeblocks;
454  }
455 
456  /*
457  * Set # of allocated blocks, as well as # blocks written. Use extent of
458  * new BufFile space (from 0 to end of last worker's tape space) for this.
459  * Allocated/written blocks should include space used by holes left
460  * between concatenated BufFiles.
461  */
462  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
463  lts->nBlocksWritten = lts->nBlocksAllocated;
464 
465  /*
466  * Compute number of hole blocks so that we can later work backwards, and
467  * instrument number of physical blocks. We don't simply use physical
468  * blocks directly for instrumentation because this would break if we ever
469  * subsequently wrote to worker tape.
470  *
471  * Working backwards like this keeps our options open. If shared BufFiles
472  * ever support being written to post-export, logtape.c can automatically
473  * take advantage of that. We'd then support writing to the leader tape
474  * while recycling space from worker tapes, because the leader tape has a
475  * zero offset (write routines won't need to have extra logic to apply an
476  * offset).
477  *
478  * The only thing that currently prevents writing to the leader tape from
479  * working is the fact that BufFiles opened using BufFileOpenShared() are
480  * read-only by definition, but that could be changed if it seemed
481  * worthwhile. For now, writing to the leader tape will raise a "Bad file
482  * descriptor" error, so tuplesort must avoid writing to the leader tape
483  * altogether.
484  */
485  lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
486 }
487 
488 /*
489  * Create a set of logical tapes in a temporary underlying file.
490  *
491  * Each tape is initialized in write state. Serial callers pass ntapes,
492  * NULL argument for shared, and -1 for worker. Parallel worker callers
493  * pass ntapes, a shared file handle, NULL shared argument, and their own
494  * worker number. Leader callers, which claim shared worker tapes here,
495  * must supply non-sentinel values for all arguments except worker number,
496  * which should be -1.
497  *
498  * Leader caller is passing back an array of metadata each worker captured
499  * when LogicalTapeFreeze() was called for their final result tapes. Passed
500  * tapes array is actually sized ntapes - 1, because it includes only
501  * worker tapes, whereas leader requires its own leader tape. Note that we
502  * rely on the assumption that reclaimed worker tapes will only be read
503  * from once by leader, and never written to again (tapes are initialized
504  * for writing, but that's only to be consistent). Leader may not write to
505  * its own tape purely due to a restriction in the shared buffile
506  * infrastructure that may be lifted in the future.
507  */
509 LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
510  int worker)
511 {
512  LogicalTapeSet *lts;
513  LogicalTape *lt;
514  int i;
515 
516  /*
517  * Create top-level struct including per-tape LogicalTape structs.
518  */
519  Assert(ntapes > 0);
520  lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
521  ntapes * sizeof(LogicalTape));
522  lts->nBlocksAllocated = 0L;
523  lts->nBlocksWritten = 0L;
524  lts->nHoleBlocks = 0L;
525  lts->forgetFreeSpace = false;
526  lts->blocksSorted = true; /* a zero-length array is sorted ... */
527  lts->freeBlocksLen = 32; /* reasonable initial guess */
528  lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
529  lts->nFreeBlocks = 0;
530  lts->nTapes = ntapes;
531 
532  /*
533  * Initialize per-tape structs. Note we allocate the I/O buffer and the
534  * first block for a tape only when it is first actually written to. This
535  * avoids wasting memory space when tuplesort.c overestimates the number
536  * of tapes needed.
537  */
538  for (i = 0; i < ntapes; i++)
539  {
540  lt = &lts->tapes[i];
541  lt->writing = true;
542  lt->frozen = false;
543  lt->dirty = false;
544  lt->firstBlockNumber = -1L;
545  lt->curBlockNumber = -1L;
546  lt->nextBlockNumber = -1L;
547  lt->offsetBlockNumber = 0L;
548  lt->buffer = NULL;
549  lt->buffer_size = 0;
550  /* palloc() larger than MaxAllocSize would fail */
551  lt->max_size = MaxAllocSize;
552  lt->pos = 0;
553  lt->nbytes = 0;
554  }
555 
556  /*
557  * Create temp BufFile storage as required.
558  *
559  * Leader concatenates worker tapes, which requires special adjustment to
560  * final tapeset data. Things are simpler for the worker case and the
561  * serial case, though. They are generally very similar -- workers use a
562  * shared fileset, whereas serial sorts use a conventional serial BufFile.
563  */
564  if (shared)
565  ltsConcatWorkerTapes(lts, shared, fileset);
566  else if (fileset)
567  {
568  char filename[MAXPGPATH];
569 
570  pg_itoa(worker, filename);
571  lts->pfile = BufFileCreateShared(fileset, filename);
572  }
573  else
574  lts->pfile = BufFileCreateTemp(false);
575 
576  return lts;
577 }
578 
579 /*
580  * Close a logical tape set and release all resources.
581  */
582 void
584 {
585  LogicalTape *lt;
586  int i;
587 
588  BufFileClose(lts->pfile);
589  for (i = 0; i < lts->nTapes; i++)
590  {
591  lt = &lts->tapes[i];
592  if (lt->buffer)
593  pfree(lt->buffer);
594  }
595  pfree(lts->freeBlocks);
596  pfree(lts);
597 }
598 
599 /*
600  * Mark a logical tape set as not needing management of free space anymore.
601  *
602  * This should be called if the caller does not intend to write any more data
603  * into the tape set, but is reading from un-frozen tapes. Since no more
604  * writes are planned, remembering free blocks is no longer useful. Setting
605  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
606  * is not designed to handle large numbers of free blocks.
607  */
608 void
610 {
611  lts->forgetFreeSpace = true;
612 }
613 
614 /*
615  * Write to a logical tape.
616  *
617  * There are no error returns; we ereport() on failure.
618  */
619 void
621  void *ptr, size_t size)
622 {
623  LogicalTape *lt;
624  size_t nthistime;
625 
626  Assert(tapenum >= 0 && tapenum < lts->nTapes);
627  lt = &lts->tapes[tapenum];
628  Assert(lt->writing);
629  Assert(lt->offsetBlockNumber == 0L);
630 
631  /* Allocate data buffer and first block on first write */
632  if (lt->buffer == NULL)
633  {
634  lt->buffer = (char *) palloc(BLCKSZ);
635  lt->buffer_size = BLCKSZ;
636  }
637  if (lt->curBlockNumber == -1)
638  {
639  Assert(lt->firstBlockNumber == -1);
640  Assert(lt->pos == 0);
641 
642  lt->curBlockNumber = ltsGetFreeBlock(lts);
644 
645  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
646  }
647 
648  Assert(lt->buffer_size == BLCKSZ);
649  while (size > 0)
650  {
651  if (lt->pos >= TapeBlockPayloadSize)
652  {
653  /* Buffer full, dump it out */
654  long nextBlockNumber;
655 
656  if (!lt->dirty)
657  {
658  /* Hmm, went directly from reading to writing? */
659  elog(ERROR, "invalid logtape state: should be dirty");
660  }
661 
662  /*
663  * First allocate the next block, so that we can store it in the
664  * 'next' pointer of this block.
665  */
666  nextBlockNumber = ltsGetFreeBlock(lts);
667 
668  /* set the next-pointer and dump the current block. */
669  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
670  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
671 
672  /* initialize the prev-pointer of the next block */
673  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
674  lt->curBlockNumber = nextBlockNumber;
675  lt->pos = 0;
676  lt->nbytes = 0;
677  }
678 
679  nthistime = TapeBlockPayloadSize - lt->pos;
680  if (nthistime > size)
681  nthistime = size;
682  Assert(nthistime > 0);
683 
684  memcpy(lt->buffer + lt->pos, ptr, nthistime);
685 
686  lt->dirty = true;
687  lt->pos += nthistime;
688  if (lt->nbytes < lt->pos)
689  lt->nbytes = lt->pos;
690  ptr = (void *) ((char *) ptr + nthistime);
691  size -= nthistime;
692  }
693 }
694 
695 /*
696  * Rewind logical tape and switch from writing to reading.
697  *
698  * The tape must currently be in writing state, or "frozen" in read state.
699  *
700  * 'buffer_size' specifies how much memory to use for the read buffer.
701  * Regardless of the argument, the actual amount of memory used is between
702  * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
703  * rounded down and truncated to fit those constraints, if necessary. If the
704  * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
705  * byte buffer is used.
706  */
707 void
708 LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
709 {
710  LogicalTape *lt;
711 
712  Assert(tapenum >= 0 && tapenum < lts->nTapes);
713  lt = &lts->tapes[tapenum];
714 
715  /*
716  * Round and cap buffer_size if needed.
717  */
718  if (lt->frozen)
719  buffer_size = BLCKSZ;
720  else
721  {
722  /* need at least one block */
723  if (buffer_size < BLCKSZ)
724  buffer_size = BLCKSZ;
725 
726  /* palloc() larger than max_size is unlikely to be helpful */
727  if (buffer_size > lt->max_size)
728  buffer_size = lt->max_size;
729 
730  /* round down to BLCKSZ boundary */
731  buffer_size -= buffer_size % BLCKSZ;
732  }
733 
734  if (lt->writing)
735  {
736  /*
737  * Completion of a write phase. Flush last partial data block, and
738  * rewind for normal (destructive) read.
739  */
740  if (lt->dirty)
741  {
742  /*
743  * As long as we've filled the buffer at least once, its contents
744  * are entirely defined from valgrind's point of view, even though
745  * contents beyond the current end point may be stale. But it's
746  * possible - at least in the case of a parallel sort - to sort
747  * such small amount of data that we do not fill the buffer even
748  * once. Tell valgrind that its contents are defined, so it
749  * doesn't bleat.
750  */
752  lt->buffer_size - lt->nbytes);
753 
754  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
755  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
756  }
757  lt->writing = false;
758  }
759  else
760  {
761  /*
762  * This is only OK if tape is frozen; we rewind for (another) read
763  * pass.
764  */
765  Assert(lt->frozen);
766  }
767 
768  /* Allocate a read buffer (unless the tape is empty) */
769  if (lt->buffer)
770  pfree(lt->buffer);
771  lt->buffer = NULL;
772  lt->buffer_size = 0;
773  if (lt->firstBlockNumber != -1L)
774  {
775  lt->buffer = palloc(buffer_size);
776  lt->buffer_size = buffer_size;
777  }
778 
779  /* Read the first block, or reset if tape is empty */
781  lt->pos = 0;
782  lt->nbytes = 0;
783  ltsReadFillBuffer(lts, lt);
784 }
785 
786 /*
787  * Rewind logical tape and switch from reading to writing.
788  *
789  * NOTE: we assume the caller has read the tape to the end; otherwise
790  * untouched data will not have been freed. We could add more code to free
791  * any unread blocks, but in current usage of this module it'd be useless
792  * code.
793  */
794 void
796 {
797  LogicalTape *lt;
798 
799  Assert(tapenum >= 0 && tapenum < lts->nTapes);
800  lt = &lts->tapes[tapenum];
801 
802  Assert(!lt->writing && !lt->frozen);
803  lt->writing = true;
804  lt->dirty = false;
805  lt->firstBlockNumber = -1L;
806  lt->curBlockNumber = -1L;
807  lt->pos = 0;
808  lt->nbytes = 0;
809  if (lt->buffer)
810  pfree(lt->buffer);
811  lt->buffer = NULL;
812  lt->buffer_size = 0;
813 }
814 
815 /*
816  * Read from a logical tape.
817  *
818  * Early EOF is indicated by return value less than #bytes requested.
819  */
820 size_t
821 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
822  void *ptr, size_t size)
823 {
824  LogicalTape *lt;
825  size_t nread = 0;
826  size_t nthistime;
827 
828  Assert(tapenum >= 0 && tapenum < lts->nTapes);
829  lt = &lts->tapes[tapenum];
830  Assert(!lt->writing);
831 
832  while (size > 0)
833  {
834  if (lt->pos >= lt->nbytes)
835  {
836  /* Try to load more data into buffer. */
837  if (!ltsReadFillBuffer(lts, lt))
838  break; /* EOF */
839  }
840 
841  nthistime = lt->nbytes - lt->pos;
842  if (nthistime > size)
843  nthistime = size;
844  Assert(nthistime > 0);
845 
846  memcpy(ptr, lt->buffer + lt->pos, nthistime);
847 
848  lt->pos += nthistime;
849  ptr = (void *) ((char *) ptr + nthistime);
850  size -= nthistime;
851  nread += nthistime;
852  }
853 
854  return nread;
855 }
856 
857 /*
858  * "Freeze" the contents of a tape so that it can be read multiple times
859  * and/or read backwards. Once a tape is frozen, its contents will not
860  * be released until the LogicalTapeSet is destroyed. This is expected
861  * to be used only for the final output pass of a merge.
862  *
863  * This *must* be called just at the end of a write pass, before the
864  * tape is rewound (after rewind is too late!). It performs a rewind
865  * and switch to read mode "for free". An immediately following rewind-
866  * for-read call is OK but not necessary.
867  *
868  * share output argument is set with details of storage used for tape after
869  * freezing, which may be passed to LogicalTapeSetCreate within leader
870  * process later. This metadata is only of interest to worker callers
871  * freezing their final output for leader (single materialized tape).
872  * Serial sorts should set share to NULL.
873  */
874 void
875 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
876 {
877  LogicalTape *lt;
878 
879  Assert(tapenum >= 0 && tapenum < lts->nTapes);
880  lt = &lts->tapes[tapenum];
881  Assert(lt->writing);
882  Assert(lt->offsetBlockNumber == 0L);
883 
884  /*
885  * Completion of a write phase. Flush last partial data block, and rewind
886  * for nondestructive read.
887  */
888  if (lt->dirty)
889  {
890  /*
891  * As long as we've filled the buffer at least once, its contents are
892  * entirely defined from valgrind's point of view, even though
893  * contents beyond the current end point may be stale. But it's
894  * possible - at least in the case of a parallel sort - to sort such
895  * small amount of data that we do not fill the buffer even once. Tell
896  * valgrind that its contents are defined, so it doesn't bleat.
897  */
899  lt->buffer_size - lt->nbytes);
900 
901  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
902  ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
903  lt->writing = false;
904  }
905  lt->writing = false;
906  lt->frozen = true;
907 
908  /*
909  * The seek and backspace functions assume a single block read buffer.
910  * That's OK with current usage. A larger buffer is helpful to make the
911  * read pattern of the backing file look more sequential to the OS, when
912  * we're reading from multiple tapes. But at the end of a sort, when a
913  * tape is frozen, we only read from a single tape anyway.
914  */
915  if (!lt->buffer || lt->buffer_size != BLCKSZ)
916  {
917  if (lt->buffer)
918  pfree(lt->buffer);
919  lt->buffer = palloc(BLCKSZ);
920  lt->buffer_size = BLCKSZ;
921  }
922 
923  /* Read the first block, or reset if tape is empty */
925  lt->pos = 0;
926  lt->nbytes = 0;
927 
928  if (lt->firstBlockNumber == -1L)
929  lt->nextBlockNumber = -1L;
930  ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
931  if (TapeBlockIsLast(lt->buffer))
932  lt->nextBlockNumber = -1L;
933  else
934  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
935  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
936 
937  /* Handle extra steps when caller is to share its tapeset */
938  if (share)
939  {
941  share->firstblocknumber = lt->firstBlockNumber;
942  share->buffilesize = BufFileSize(lts->pfile);
943  }
944 }
945 
946 /*
947  * Backspace the tape a given number of bytes. (We also support a more
948  * general seek interface, see below.)
949  *
950  * *Only* a frozen-for-read tape can be backed up; we don't support
951  * random access during write, and an unfrozen read tape may have
952  * already discarded the desired data!
953  *
954  * Returns the number of bytes backed up. It can be less than the
955  * requested amount, if there isn't that much data before the current
956  * position. The tape is positioned to the beginning of the tape in
957  * that case.
958  */
959 size_t
960 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
961 {
962  LogicalTape *lt;
963  size_t seekpos = 0;
964 
965  Assert(tapenum >= 0 && tapenum < lts->nTapes);
966  lt = &lts->tapes[tapenum];
967  Assert(lt->frozen);
968  Assert(lt->buffer_size == BLCKSZ);
969 
970  /*
971  * Easy case for seek within current block.
972  */
973  if (size <= (size_t) lt->pos)
974  {
975  lt->pos -= (int) size;
976  return size;
977  }
978 
979  /*
980  * Not-so-easy case, have to walk back the chain of blocks. This
981  * implementation would be pretty inefficient for long seeks, but we
982  * really aren't doing that (a seek over one tuple is typical).
983  */
984  seekpos = (size_t) lt->pos; /* part within this block */
985  while (size > seekpos)
986  {
987  long prev = TapeBlockGetTrailer(lt->buffer)->prev;
988 
989  if (prev == -1L)
990  {
991  /* Tried to back up beyond the beginning of tape. */
992  if (lt->curBlockNumber != lt->firstBlockNumber)
993  elog(ERROR, "unexpected end of tape");
994  lt->pos = 0;
995  return seekpos;
996  }
997 
998  ltsReadBlock(lts, prev, (void *) lt->buffer);
999 
1000  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1001  elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1002  prev,
1003  TapeBlockGetTrailer(lt->buffer)->next,
1004  lt->curBlockNumber);
1005 
1007  lt->curBlockNumber = prev;
1008  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1009 
1010  seekpos += TapeBlockPayloadSize;
1011  }
1012 
1013  /*
1014  * 'seekpos' can now be greater than 'size', because it points to the
1015  * beginning the target block. The difference is the position within the
1016  * page.
1017  */
1018  lt->pos = seekpos - size;
1019  return size;
1020 }
1021 
1022 /*
1023  * Seek to an arbitrary position in a logical tape.
1024  *
1025  * *Only* a frozen-for-read tape can be seeked.
1026  *
1027  * Must be called with a block/offset previously returned by
1028  * LogicalTapeTell().
1029  */
1030 void
1032  long blocknum, int offset)
1033 {
1034  LogicalTape *lt;
1035 
1036  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1037  lt = &lts->tapes[tapenum];
1038  Assert(lt->frozen);
1039  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1040  Assert(lt->buffer_size == BLCKSZ);
1041 
1042  if (blocknum != lt->curBlockNumber)
1043  {
1044  ltsReadBlock(lts, blocknum, (void *) lt->buffer);
1045  lt->curBlockNumber = blocknum;
1047  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1048  }
1049 
1050  if (offset > lt->nbytes)
1051  elog(ERROR, "invalid tape seek position");
1052  lt->pos = offset;
1053 }
1054 
1055 /*
1056  * Obtain current position in a form suitable for a later LogicalTapeSeek.
1057  *
1058  * NOTE: it'd be OK to do this during write phase with intention of using
1059  * the position for a seek after freezing. Not clear if anyone needs that.
1060  */
1061 void
1063  long *blocknum, int *offset)
1064 {
1065  LogicalTape *lt;
1066 
1067  Assert(tapenum >= 0 && tapenum < lts->nTapes);
1068  lt = &lts->tapes[tapenum];
1069  Assert(lt->offsetBlockNumber == 0L);
1070 
1071  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1072  Assert(lt->buffer_size == BLCKSZ);
1073 
1074  *blocknum = lt->curBlockNumber;
1075  *offset = lt->pos;
1076 }
1077 
1078 /*
1079  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
1080  */
1081 long
1083 {
1084  return lts->nBlocksAllocated - lts->nHoleBlocks;
1085 }
int max_size
Definition: logtape.c:156
size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:821
LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]
Definition: logtape.c:206
int nFreeBlocks
Definition: logtape.c:201
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
#define TapeBlockIsLast(buf)
Definition: logtape.c:111
#define TapeBlockPayloadSize
Definition: logtape.c:107
off_t buffilesize
Definition: logtape.h:53
long offsetBlockNumber
Definition: logtape.c:149
long nBlocksWritten
Definition: logtape.c:182
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:278
int freeBlocksLen
Definition: logtape.c:202
static bool ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:287
#define Min(x, y)
Definition: c.h:857
static long ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:346
BufFile * pfile
Definition: logtape.c:169
#define MemSet(start, val, len)
Definition: c.h:908
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
Definition: logtape.c:371
long firstblocknumber
Definition: logtape.h:52
bool frozen
Definition: logtape.c:132
long nextBlockNumber
Definition: logtape.c:148
void BufFileClose(BufFile *file)
Definition: buffile.c:397
void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
Definition: logtape.c:795
bool writing
Definition: logtape.c:131
long * freeBlocks
Definition: logtape.c:200
bool dirty
Definition: logtape.c:133
void pfree(void *pointer)
Definition: mcxt.c:1031
void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, void *ptr, size_t size)
Definition: logtape.c:620
#define ERROR
Definition: elog.h:43
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:182
#define MAXPGPATH
LogicalTapeSet * LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker)
Definition: logtape.c:509
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:245
size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Definition: logtape.c:960
int errcode_for_file_access(void)
Definition: elog.c:598
long nHoleBlocks
Definition: logtape.c:183
int nbytes
Definition: logtape.c:158
void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset)
Definition: logtape.c:1062
static int freeBlocks_cmp(const void *a, const void *b)
Definition: logtape.c:329
#define ereport(elevel, rest)
Definition: elog.h:122
void pg_itoa(int16 i, char *a)
Definition: numutils.c:120
#define MaxAllocSize
Definition: memutils.h:40
long firstBlockNumber
Definition: logtape.c:146
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:778
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:115
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset)
Definition: logtape.c:410
long curBlockNumber
Definition: logtape.c:147
#define Assert(condition)
Definition: c.h:699
struct TapeBlockTrailer TapeBlockTrailer
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
void BufFileExportShared(BufFile *file)
Definition: buffile.c:379
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1044
void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
Definition: logtape.c:708
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:112
void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
Definition: logtape.c:875
char * buffer
Definition: logtape.c:154
static char * filename
Definition: pg_dumpall.c:87
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool forgetFreeSpace
Definition: logtape.c:198
int i
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:554
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:601
void LogicalTapeSetClose(LogicalTapeSet *lts)
Definition: logtape.c:583
void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset)
Definition: logtape.c:1031
#define elog
Definition: elog.h:219
void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
Definition: logtape.c:609
long LogicalTapeSetBlocks(LogicalTapeSet *lts)
Definition: logtape.c:1082
#define qsort(a, b, c, d)
Definition: port.h:421
struct LogicalTape LogicalTape
long BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:835
bool blocksSorted
Definition: logtape.c:199
long nBlocksAllocated
Definition: logtape.c:181
off_t BufFileSize(BufFile *file)
Definition: buffile.c:809
int buffer_size
Definition: logtape.c:155
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:223
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:108
#define offsetof(type, field)
Definition: c.h:622
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:271