PostgreSQL Source Code  git master
logtape.c File Reference
#include "postgres.h"
#include <fcntl.h>
#include "storage/buffile.h"
#include "utils/builtins.h"
#include "utils/logtape.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"
Include dependency graph for logtape.c:

Go to the source code of this file.

Data Structures

struct  TapeBlockTrailer
 
struct  LogicalTape
 
struct  LogicalTapeSet
 

Macros

#define TapeBlockPayloadSize   (BLCKSZ - sizeof(TapeBlockTrailer))
 
#define TapeBlockGetTrailer(buf)    ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
 
#define TapeBlockIsLast(buf)   (TapeBlockGetTrailer(buf)->next < 0)
 
#define TapeBlockGetNBytes(buf)
 
#define TapeBlockSetNBytes(buf, nbytes)    (TapeBlockGetTrailer(buf)->next = -(nbytes))
 
#define TAPE_WRITE_PREALLOC_MIN   8
 
#define TAPE_WRITE_PREALLOC_MAX   128
 

Typedefs

typedef struct TapeBlockTrailer TapeBlockTrailer
 

Functions

static LogicalTapeltsCreateTape (LogicalTapeSet *lts)
 
static void ltsWriteBlock (LogicalTapeSet *lts, long blocknum, void *buffer)
 
static void ltsReadBlock (LogicalTapeSet *lts, long blocknum, void *buffer)
 
static long ltsGetBlock (LogicalTapeSet *lts, LogicalTape *lt)
 
static long ltsGetFreeBlock (LogicalTapeSet *lts)
 
static long ltsGetPreallocBlock (LogicalTapeSet *lts, LogicalTape *lt)
 
static void ltsReleaseBlock (LogicalTapeSet *lts, long blocknum)
 
static void ltsInitReadBuffer (LogicalTape *lt)
 
static bool ltsReadFillBuffer (LogicalTape *lt)
 
static unsigned long left_offset (unsigned long i)
 
static unsigned long right_offset (unsigned long i)
 
static unsigned long parent_offset (unsigned long i)
 
LogicalTapeSetLogicalTapeSetCreate (bool preallocate, SharedFileSet *fileset, int worker)
 
LogicalTapeLogicalTapeImport (LogicalTapeSet *lts, int worker, TapeShare *shared)
 
void LogicalTapeSetClose (LogicalTapeSet *lts)
 
LogicalTapeLogicalTapeCreate (LogicalTapeSet *lts)
 
void LogicalTapeClose (LogicalTape *lt)
 
void LogicalTapeSetForgetFreeSpace (LogicalTapeSet *lts)
 
void LogicalTapeWrite (LogicalTape *lt, void *ptr, size_t size)
 
void LogicalTapeRewindForRead (LogicalTape *lt, size_t buffer_size)
 
size_t LogicalTapeRead (LogicalTape *lt, void *ptr, size_t size)
 
void LogicalTapeFreeze (LogicalTape *lt, TapeShare *share)
 
size_t LogicalTapeBackspace (LogicalTape *lt, size_t size)
 
void LogicalTapeSeek (LogicalTape *lt, long blocknum, int offset)
 
void LogicalTapeTell (LogicalTape *lt, long *blocknum, int *offset)
 
long LogicalTapeSetBlocks (LogicalTapeSet *lts)
 

Macro Definition Documentation

◆ TAPE_WRITE_PREALLOC_MAX

#define TAPE_WRITE_PREALLOC_MAX   128

Definition at line 125 of file logtape.c.

◆ TAPE_WRITE_PREALLOC_MIN

#define TAPE_WRITE_PREALLOC_MIN   8

Definition at line 124 of file logtape.c.

◆ TapeBlockGetNBytes

#define TapeBlockGetNBytes (   buf)
Value:
#define TapeBlockGetTrailer(buf)
Definition: logtape.c:104
#define TapeBlockPayloadSize
Definition: logtape.c:103
#define TapeBlockIsLast(buf)
Definition: logtape.c:107
static char * buf
Definition: pg_test_fsync.c:67

Definition at line 108 of file logtape.c.

◆ TapeBlockGetTrailer

#define TapeBlockGetTrailer (   buf)     ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))

Definition at line 104 of file logtape.c.

◆ TapeBlockIsLast

#define TapeBlockIsLast (   buf)    (TapeBlockGetTrailer(buf)->next < 0)

Definition at line 107 of file logtape.c.

◆ TapeBlockPayloadSize

#define TapeBlockPayloadSize   (BLCKSZ - sizeof(TapeBlockTrailer))

Definition at line 103 of file logtape.c.

◆ TapeBlockSetNBytes

#define TapeBlockSetNBytes (   buf,
  nbytes 
)     (TapeBlockGetTrailer(buf)->next = -(nbytes))

Definition at line 111 of file logtape.c.

Typedef Documentation

◆ TapeBlockTrailer

Function Documentation

◆ left_offset()

static unsigned long left_offset ( unsigned long  i)
inlinestatic

Definition at line 344 of file logtape.c.

345 {
346  return 2 * i + 1;
347 }
int i
Definition: isn.c:73

References i.

Referenced by ltsGetFreeBlock().

◆ LogicalTapeBackspace()

size_t LogicalTapeBackspace ( LogicalTape lt,
size_t  size 
)

Definition at line 1063 of file logtape.c.

1064 {
1065  size_t seekpos = 0;
1066 
1067  Assert(lt->frozen);
1068  Assert(lt->buffer_size == BLCKSZ);
1069 
1070  if (lt->buffer == NULL)
1071  ltsInitReadBuffer(lt);
1072 
1073  /*
1074  * Easy case for seek within current block.
1075  */
1076  if (size <= (size_t) lt->pos)
1077  {
1078  lt->pos -= (int) size;
1079  return size;
1080  }
1081 
1082  /*
1083  * Not-so-easy case, have to walk back the chain of blocks. This
1084  * implementation would be pretty inefficient for long seeks, but we
1085  * really aren't doing that (a seek over one tuple is typical).
1086  */
1087  seekpos = (size_t) lt->pos; /* part within this block */
1088  while (size > seekpos)
1089  {
1090  long prev = TapeBlockGetTrailer(lt->buffer)->prev;
1091 
1092  if (prev == -1L)
1093  {
1094  /* Tried to back up beyond the beginning of tape. */
1095  if (lt->curBlockNumber != lt->firstBlockNumber)
1096  elog(ERROR, "unexpected end of tape");
1097  lt->pos = 0;
1098  return seekpos;
1099  }
1100 
1101  ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
1102 
1103  if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1104  elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1105  prev,
1106  TapeBlockGetTrailer(lt->buffer)->next,
1107  lt->curBlockNumber);
1108 
1110  lt->curBlockNumber = prev;
1111  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1112 
1113  seekpos += TapeBlockPayloadSize;
1114  }
1115 
1116  /*
1117  * 'seekpos' can now be greater than 'size', because it points to the
1118  * beginning the target block. The difference is the position within the
1119  * page.
1120  */
1121  lt->pos = seekpos - size;
1122  return size;
1123 }
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
Assert(fmt[strlen(fmt) - 1] !='\n')
static void ltsInitReadBuffer(LogicalTape *lt)
Definition: logtape.c:529
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:282
while(p+4<=pend)
long curBlockNumber
Definition: logtape.c:157
char * buffer
Definition: logtape.c:164
bool frozen
Definition: logtape.c:142
long nextBlockNumber
Definition: logtape.c:158
int buffer_size
Definition: logtape.c:165
long firstBlockNumber
Definition: logtape.c:156
LogicalTapeSet * tapeSet
Definition: logtape.c:139
int nbytes
Definition: logtape.c:168

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, elog, ERROR, LogicalTape::firstBlockNumber, LogicalTape::frozen, ltsInitReadBuffer(), ltsReadBlock(), LogicalTape::nbytes, LogicalTape::nextBlockNumber, LogicalTape::pos, TapeBlockGetTrailer, TapeBlockPayloadSize, LogicalTape::tapeSet, and while().

Referenced by tuplesort_gettuple_common().

◆ LogicalTapeClose()

void LogicalTapeClose ( LogicalTape lt)

Definition at line 734 of file logtape.c.

735 {
736  if (lt->buffer)
737  pfree(lt->buffer);
738  pfree(lt);
739 }
void pfree(void *pointer)
Definition: mcxt.c:1175

References LogicalTape::buffer, and pfree().

Referenced by agg_refill_hash_table(), mergeruns(), and tuplesort_gettuple_common().

◆ LogicalTapeCreate()

LogicalTape* LogicalTapeCreate ( LogicalTapeSet lts)

Definition at line 681 of file logtape.c.

682 {
683  /*
684  * The only thing that currently prevents creating new tapes in leader is
685  * the fact that BufFiles opened using BufFileOpenShared() are read-only
686  * by definition, but that could be changed if it seemed worthwhile. For
687  * now, writing to the leader tape will raise a "Bad file descriptor"
688  * error, so tuplesort must avoid writing to the leader tape altogether.
689  */
690  if (lts->fileset && lts->worker == -1)
691  elog(ERROR, "cannot create new tapes in leader process");
692 
693  return ltsCreateTape(lts);
694 }
static LogicalTape * ltsCreateTape(LogicalTapeSet *lts)
Definition: logtape.c:697
SharedFileSet * fileset
Definition: logtape.c:190

References elog, ERROR, LogicalTapeSet::fileset, ltsCreateTape(), and LogicalTapeSet::worker.

Referenced by hashagg_spill_init(), and selectnewtape().

◆ LogicalTapeFreeze()

void LogicalTapeFreeze ( LogicalTape lt,
TapeShare share 
)

Definition at line 982 of file logtape.c.

983 {
984  LogicalTapeSet *lts = lt->tapeSet;
985 
986  Assert(lt->writing);
987  Assert(lt->offsetBlockNumber == 0L);
988 
989  /*
990  * Completion of a write phase. Flush last partial data block, and rewind
991  * for nondestructive read.
992  */
993  if (lt->dirty)
994  {
995  /*
996  * As long as we've filled the buffer at least once, its contents are
997  * entirely defined from valgrind's point of view, even though
998  * contents beyond the current end point may be stale. But it's
999  * possible - at least in the case of a parallel sort - to sort such
1000  * small amount of data that we do not fill the buffer even once. Tell
1001  * valgrind that its contents are defined, so it doesn't bleat.
1002  */
1004  lt->buffer_size - lt->nbytes);
1005 
1006  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
1007  ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
1008  }
1009  lt->writing = false;
1010  lt->frozen = true;
1011 
1012  /*
1013  * The seek and backspace functions assume a single block read buffer.
1014  * That's OK with current usage. A larger buffer is helpful to make the
1015  * read pattern of the backing file look more sequential to the OS, when
1016  * we're reading from multiple tapes. But at the end of a sort, when a
1017  * tape is frozen, we only read from a single tape anyway.
1018  */
1019  if (!lt->buffer || lt->buffer_size != BLCKSZ)
1020  {
1021  if (lt->buffer)
1022  pfree(lt->buffer);
1023  lt->buffer = palloc(BLCKSZ);
1024  lt->buffer_size = BLCKSZ;
1025  }
1026 
1027  /* Read the first block, or reset if tape is empty */
1028  lt->curBlockNumber = lt->firstBlockNumber;
1029  lt->pos = 0;
1030  lt->nbytes = 0;
1031 
1032  if (lt->firstBlockNumber == -1L)
1033  lt->nextBlockNumber = -1L;
1034  ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
1035  if (TapeBlockIsLast(lt->buffer))
1036  lt->nextBlockNumber = -1L;
1037  else
1038  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1039  lt->nbytes = TapeBlockGetNBytes(lt->buffer);
1040 
1041  /* Handle extra steps when caller is to share its tapeset */
1042  if (share)
1043  {
1045  share->firstblocknumber = lt->firstBlockNumber;
1046  }
1047 }
void BufFileExportFileSet(BufFile *file)
Definition: buffile.c:389
#define TapeBlockGetNBytes(buf)
Definition: logtape.c:108
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
Definition: logtape.c:238
#define TapeBlockSetNBytes(buf, nbytes)
Definition: logtape.c:111
void * palloc(Size size)
Definition: mcxt.c:1068
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
BufFile * pfile
Definition: logtape.c:189
long offsetBlockNumber
Definition: logtape.c:159
bool writing
Definition: logtape.c:141
bool dirty
Definition: logtape.c:143
long firstblocknumber
Definition: logtape.h:54

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, BufFileExportFileSet(), LogicalTape::curBlockNumber, LogicalTape::dirty, LogicalTape::firstBlockNumber, TapeShare::firstblocknumber, LogicalTape::frozen, ltsReadBlock(), ltsWriteBlock(), LogicalTape::nbytes, LogicalTape::nextBlockNumber, LogicalTape::offsetBlockNumber, palloc(), LogicalTapeSet::pfile, pfree(), LogicalTape::pos, TapeBlockGetNBytes, TapeBlockGetTrailer, TapeBlockIsLast, TapeBlockSetNBytes, LogicalTape::tapeSet, VALGRIND_MAKE_MEM_DEFINED, and LogicalTape::writing.

Referenced by mergeruns(), and worker_freeze_result_tape().

◆ LogicalTapeImport()

LogicalTape* LogicalTapeImport ( LogicalTapeSet lts,
int  worker,
TapeShare shared 
)

Definition at line 610 of file logtape.c.

611 {
612  LogicalTape *lt;
613  long tapeblocks;
614  char filename[MAXPGPATH];
615  BufFile *file;
616  int64 filesize;
617 
618  lt = ltsCreateTape(lts);
619 
620  /*
621  * build concatenated view of all buffiles, remembering the block number
622  * where each source file begins.
623  */
624  pg_itoa(worker, filename);
625  file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
626  filesize = BufFileSize(file);
627 
628  /*
629  * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
630  * block offset into each tape as we go.
631  */
632  lt->firstBlockNumber = shared->firstblocknumber;
633  if (lts->pfile == NULL)
634  {
635  lts->pfile = file;
636  lt->offsetBlockNumber = 0L;
637  }
638  else
639  {
640  lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
641  }
642  /* Don't allocate more for read buffer than could possibly help */
643  lt->max_size = Min(MaxAllocSize, filesize);
644  tapeblocks = filesize / BLCKSZ;
645 
646  /*
647  * Update # of allocated blocks and # blocks written to reflect the
648  * imported BufFile. Allocated/written blocks include space used by holes
649  * left between concatenated BufFiles. Also track the number of hole
650  * blocks so that we can later work backwards to calculate the number of
651  * physical blocks for instrumentation.
652  */
654 
655  lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
656  lts->nBlocksWritten = lts->nBlocksAllocated;
657 
658  return lt;
659 }
long BufFileAppend(BufFile *target, BufFile *source)
Definition: buffile.c:872
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
int64 BufFileSize(BufFile *file)
Definition: buffile.c:833
#define Min(x, y)
Definition: c.h:986
#define MaxAllocSize
Definition: memutils.h:40
int pg_itoa(int16 i, char *a)
Definition: numutils.c:334
#define MAXPGPATH
static char * filename
Definition: pg_dumpall.c:94
long nBlocksAllocated
Definition: logtape.c:203
long nHoleBlocks
Definition: logtape.c:205
long nBlocksWritten
Definition: logtape.c:204
int max_size
Definition: logtape.c:166

References BufFileAppend(), BufFileOpenFileSet(), BufFileSize(), filename, LogicalTapeSet::fileset, LogicalTape::firstBlockNumber, TapeShare::firstblocknumber, SharedFileSet::fs, ltsCreateTape(), LogicalTape::max_size, MaxAllocSize, MAXPGPATH, Min, LogicalTapeSet::nBlocksAllocated, LogicalTapeSet::nBlocksWritten, LogicalTapeSet::nHoleBlocks, LogicalTape::offsetBlockNumber, LogicalTapeSet::pfile, and pg_itoa().

Referenced by leader_takeover_tapes().

◆ LogicalTapeRead()

size_t LogicalTapeRead ( LogicalTape lt,
void *  ptr,
size_t  size 
)

Definition at line 929 of file logtape.c.

930 {
931  size_t nread = 0;
932  size_t nthistime;
933 
934  Assert(!lt->writing);
935 
936  if (lt->buffer == NULL)
937  ltsInitReadBuffer(lt);
938 
939  while (size > 0)
940  {
941  if (lt->pos >= lt->nbytes)
942  {
943  /* Try to load more data into buffer. */
944  if (!ltsReadFillBuffer(lt))
945  break; /* EOF */
946  }
947 
948  nthistime = lt->nbytes - lt->pos;
949  if (nthistime > size)
950  nthistime = size;
951  Assert(nthistime > 0);
952 
953  memcpy(ptr, lt->buffer + lt->pos, nthistime);
954 
955  lt->pos += nthistime;
956  ptr = (void *) ((char *) ptr + nthistime);
957  size -= nthistime;
958  nread += nthistime;
959  }
960 
961  return nread;
962 }
static bool ltsReadFillBuffer(LogicalTape *lt)
Definition: logtape.c:305

References Assert(), LogicalTape::buffer, ltsInitReadBuffer(), ltsReadFillBuffer(), LogicalTape::nbytes, LogicalTape::pos, and LogicalTape::writing.

Referenced by getlen(), and hashagg_batch_read().

◆ LogicalTapeRewindForRead()

void LogicalTapeRewindForRead ( LogicalTape lt,
size_t  buffer_size 
)

Definition at line 847 of file logtape.c.

848 {
849  LogicalTapeSet *lts = lt->tapeSet;
850 
851  /*
852  * Round and cap buffer_size if needed.
853  */
854  if (lt->frozen)
855  buffer_size = BLCKSZ;
856  else
857  {
858  /* need at least one block */
859  if (buffer_size < BLCKSZ)
860  buffer_size = BLCKSZ;
861 
862  /* palloc() larger than max_size is unlikely to be helpful */
863  if (buffer_size > lt->max_size)
864  buffer_size = lt->max_size;
865 
866  /* round down to BLCKSZ boundary */
867  buffer_size -= buffer_size % BLCKSZ;
868  }
869 
870  if (lt->writing)
871  {
872  /*
873  * Completion of a write phase. Flush last partial data block, and
874  * rewind for normal (destructive) read.
875  */
876  if (lt->dirty)
877  {
878  /*
879  * As long as we've filled the buffer at least once, its contents
880  * are entirely defined from valgrind's point of view, even though
881  * contents beyond the current end point may be stale. But it's
882  * possible - at least in the case of a parallel sort - to sort
883  * such small amount of data that we do not fill the buffer even
884  * once. Tell valgrind that its contents are defined, so it
885  * doesn't bleat.
886  */
888  lt->buffer_size - lt->nbytes);
889 
890  TapeBlockSetNBytes(lt->buffer, lt->nbytes);
891  ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
892  }
893  lt->writing = false;
894  }
895  else
896  {
897  /*
898  * This is only OK if tape is frozen; we rewind for (another) read
899  * pass.
900  */
901  Assert(lt->frozen);
902  }
903 
904  if (lt->buffer)
905  pfree(lt->buffer);
906 
907  /* the buffer is lazily allocated, but set the size here */
908  lt->buffer = NULL;
909  lt->buffer_size = buffer_size;
910 
911  /* free the preallocation list, and return unused block numbers */
912  if (lt->prealloc != NULL)
913  {
914  for (int i = lt->nprealloc; i > 0; i--)
915  ltsReleaseBlock(lts, lt->prealloc[i - 1]);
916  pfree(lt->prealloc);
917  lt->prealloc = NULL;
918  lt->nprealloc = 0;
919  lt->prealloc_size = 0;
920  }
921 }
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
Definition: logtape.c:476
long * prealloc
Definition: logtape.c:175
int nprealloc
Definition: logtape.c:176
int prealloc_size
Definition: logtape.c:177

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, LogicalTape::dirty, LogicalTape::frozen, i, ltsReleaseBlock(), ltsWriteBlock(), LogicalTape::max_size, LogicalTape::nbytes, LogicalTape::nprealloc, pfree(), LogicalTape::prealloc, LogicalTape::prealloc_size, TapeBlockSetNBytes, LogicalTape::tapeSet, VALGRIND_MAKE_MEM_DEFINED, and LogicalTape::writing.

Referenced by hashagg_spill_finish(), mergeruns(), and tuplesort_rescan().

◆ LogicalTapeSeek()

void LogicalTapeSeek ( LogicalTape lt,
long  blocknum,
int  offset 
)

Definition at line 1134 of file logtape.c.

1135 {
1136  Assert(lt->frozen);
1137  Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
1138  Assert(lt->buffer_size == BLCKSZ);
1139 
1140  if (lt->buffer == NULL)
1141  ltsInitReadBuffer(lt);
1142 
1143  if (blocknum != lt->curBlockNumber)
1144  {
1145  ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
1146  lt->curBlockNumber = blocknum;
1148  lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1149  }
1150 
1151  if (offset > lt->nbytes)
1152  elog(ERROR, "invalid tape seek position");
1153  lt->pos = offset;
1154 }

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, elog, ERROR, LogicalTape::frozen, ltsInitReadBuffer(), ltsReadBlock(), LogicalTape::nbytes, LogicalTape::nextBlockNumber, LogicalTape::pos, TapeBlockGetTrailer, TapeBlockPayloadSize, and LogicalTape::tapeSet.

Referenced by tuplesort_restorepos().

◆ LogicalTapeSetBlocks()

long LogicalTapeSetBlocks ( LogicalTapeSet lts)

Definition at line 1184 of file logtape.c.

1185 {
1186  return lts->nBlocksWritten - lts->nHoleBlocks;
1187 }

References LogicalTapeSet::nBlocksWritten, and LogicalTapeSet::nHoleBlocks.

Referenced by hash_agg_update_metrics(), tuplesort_free(), and tuplesort_updatemax().

◆ LogicalTapeSetClose()

void LogicalTapeSetClose ( LogicalTapeSet lts)

Definition at line 668 of file logtape.c.

669 {
670  BufFileClose(lts->pfile);
671  pfree(lts->freeBlocks);
672  pfree(lts);
673 }
void BufFileClose(BufFile *file)
Definition: buffile.c:407
long * freeBlocks
Definition: logtape.c:216

References BufFileClose(), LogicalTapeSet::freeBlocks, LogicalTapeSet::pfile, and pfree().

Referenced by hashagg_reset_spill_state(), and tuplesort_free().

◆ LogicalTapeSetCreate()

LogicalTapeSet* LogicalTapeSetCreate ( bool  preallocate,
SharedFileSet fileset,
int  worker 
)

Definition at line 557 of file logtape.c.

558 {
559  LogicalTapeSet *lts;
560 
561  /*
562  * Create top-level struct including per-tape LogicalTape structs.
563  */
564  lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
565  lts->nBlocksAllocated = 0L;
566  lts->nBlocksWritten = 0L;
567  lts->nHoleBlocks = 0L;
568  lts->forgetFreeSpace = false;
569  lts->freeBlocksLen = 32; /* reasonable initial guess */
570  lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
571  lts->nFreeBlocks = 0;
572  lts->enable_prealloc = preallocate;
573 
574  lts->fileset = fileset;
575  lts->worker = worker;
576 
577  /*
578  * Create temp BufFile storage as required.
579  *
580  * In leader, we hijack the BufFile of the first tape that's imported, and
581  * concatenate the BufFiles of any subsequent tapes to that. Hence don't
582  * create a BufFile here. Things are simpler for the worker case and the
583  * serial case, though. They are generally very similar -- workers use a
584  * shared fileset, whereas serial sorts use a conventional serial BufFile.
585  */
586  if (fileset && worker == -1)
587  lts->pfile = NULL;
588  else if (fileset)
589  {
590  char filename[MAXPGPATH];
591 
592  pg_itoa(worker, filename);
593  lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
594  }
595  else
596  lts->pfile = BufFileCreateTemp(false);
597 
598  return lts;
599 }
BufFile * BufFileCreateTemp(bool interXact)
Definition: buffile.c:188
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262
long nFreeBlocks
Definition: logtape.c:217
bool forgetFreeSpace
Definition: logtape.c:215
Size freeBlocksLen
Definition: logtape.c:218
bool enable_prealloc
Definition: logtape.c:219

References BufFileCreateFileSet(), BufFileCreateTemp(), LogicalTapeSet::enable_prealloc, filename, LogicalTapeSet::fileset, LogicalTapeSet::forgetFreeSpace, LogicalTapeSet::freeBlocks, LogicalTapeSet::freeBlocksLen, SharedFileSet::fs, MAXPGPATH, LogicalTapeSet::nBlocksAllocated, LogicalTapeSet::nBlocksWritten, LogicalTapeSet::nFreeBlocks, LogicalTapeSet::nHoleBlocks, palloc(), LogicalTapeSet::pfile, pg_itoa(), and LogicalTapeSet::worker.

Referenced by hash_agg_enter_spill_mode(), inittapes(), and leader_takeover_tapes().

◆ LogicalTapeSetForgetFreeSpace()

void LogicalTapeSetForgetFreeSpace ( LogicalTapeSet lts)

Definition at line 751 of file logtape.c.

752 {
753  lts->forgetFreeSpace = true;
754 }

References LogicalTapeSet::forgetFreeSpace.

Referenced by mergeruns().

◆ LogicalTapeTell()

void LogicalTapeTell ( LogicalTape lt,
long *  blocknum,
int *  offset 
)

Definition at line 1163 of file logtape.c.

1164 {
1165  if (lt->buffer == NULL)
1166  ltsInitReadBuffer(lt);
1167 
1168  Assert(lt->offsetBlockNumber == 0L);
1169 
1170  /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
1171  Assert(lt->buffer_size == BLCKSZ);
1172 
1173  *blocknum = lt->curBlockNumber;
1174  *offset = lt->pos;
1175 }

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, ltsInitReadBuffer(), LogicalTape::offsetBlockNumber, and LogicalTape::pos.

Referenced by tuplesort_markpos().

◆ LogicalTapeWrite()

void LogicalTapeWrite ( LogicalTape lt,
void *  ptr,
size_t  size 
)

Definition at line 762 of file logtape.c.

763 {
764  LogicalTapeSet *lts = lt->tapeSet;
765  size_t nthistime;
766 
767  Assert(lt->writing);
768  Assert(lt->offsetBlockNumber == 0L);
769 
770  /* Allocate data buffer and first block on first write */
771  if (lt->buffer == NULL)
772  {
773  lt->buffer = (char *) palloc(BLCKSZ);
774  lt->buffer_size = BLCKSZ;
775  }
776  if (lt->curBlockNumber == -1)
777  {
778  Assert(lt->firstBlockNumber == -1);
779  Assert(lt->pos == 0);
780 
781  lt->curBlockNumber = ltsGetBlock(lts, lt);
783 
784  TapeBlockGetTrailer(lt->buffer)->prev = -1L;
785  }
786 
787  Assert(lt->buffer_size == BLCKSZ);
788  while (size > 0)
789  {
790  if (lt->pos >= (int) TapeBlockPayloadSize)
791  {
792  /* Buffer full, dump it out */
793  long nextBlockNumber;
794 
795  if (!lt->dirty)
796  {
797  /* Hmm, went directly from reading to writing? */
798  elog(ERROR, "invalid logtape state: should be dirty");
799  }
800 
801  /*
802  * First allocate the next block, so that we can store it in the
803  * 'next' pointer of this block.
804  */
805  nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
806 
807  /* set the next-pointer and dump the current block. */
808  TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
809  ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
810 
811  /* initialize the prev-pointer of the next block */
812  TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
813  lt->curBlockNumber = nextBlockNumber;
814  lt->pos = 0;
815  lt->nbytes = 0;
816  }
817 
818  nthistime = TapeBlockPayloadSize - lt->pos;
819  if (nthistime > size)
820  nthistime = size;
821  Assert(nthistime > 0);
822 
823  memcpy(lt->buffer + lt->pos, ptr, nthistime);
824 
825  lt->dirty = true;
826  lt->pos += nthistime;
827  if (lt->nbytes < lt->pos)
828  lt->nbytes = lt->pos;
829  ptr = (void *) ((char *) ptr + nthistime);
830  size -= nthistime;
831  }
832 }
static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:365

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, LogicalTape::dirty, elog, ERROR, LogicalTape::firstBlockNumber, ltsGetBlock(), ltsWriteBlock(), LogicalTape::nbytes, LogicalTape::offsetBlockNumber, palloc(), LogicalTape::pos, TapeBlockGetTrailer, TapeBlockPayloadSize, LogicalTape::tapeSet, and LogicalTape::writing.

Referenced by hashagg_spill_tuple(), markrunend(), writetup_cluster(), writetup_datum(), writetup_heap(), and writetup_index().

◆ ltsCreateTape()

static LogicalTape * ltsCreateTape ( LogicalTapeSet lts)
static

Definition at line 697 of file logtape.c.

698 {
699  LogicalTape *lt;
700 
701  /*
702  * Create per-tape struct. Note we allocate the I/O buffer lazily.
703  */
704  lt = palloc(sizeof(LogicalTape));
705  lt->tapeSet = lts;
706  lt->writing = true;
707  lt->frozen = false;
708  lt->dirty = false;
709  lt->firstBlockNumber = -1L;
710  lt->curBlockNumber = -1L;
711  lt->nextBlockNumber = -1L;
712  lt->offsetBlockNumber = 0L;
713  lt->buffer = NULL;
714  lt->buffer_size = 0;
715  /* palloc() larger than MaxAllocSize would fail */
716  lt->max_size = MaxAllocSize;
717  lt->pos = 0;
718  lt->nbytes = 0;
719  lt->prealloc = NULL;
720  lt->nprealloc = 0;
721  lt->prealloc_size = 0;
722 
723  return lt;
724 }

References LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, LogicalTape::dirty, LogicalTape::firstBlockNumber, LogicalTape::frozen, LogicalTape::max_size, MaxAllocSize, LogicalTape::nbytes, LogicalTape::nextBlockNumber, LogicalTape::nprealloc, LogicalTape::offsetBlockNumber, palloc(), LogicalTape::pos, LogicalTape::prealloc, LogicalTape::prealloc_size, LogicalTape::tapeSet, and LogicalTape::writing.

Referenced by LogicalTapeCreate(), and LogicalTapeImport().

◆ ltsGetBlock()

static long ltsGetBlock ( LogicalTapeSet lts,
LogicalTape lt 
)
static

Definition at line 365 of file logtape.c.

366 {
367  if (lts->enable_prealloc)
368  return ltsGetPreallocBlock(lts, lt);
369  else
370  return ltsGetFreeBlock(lts);
371 }
static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
Definition: logtape.c:438
static long ltsGetFreeBlock(LogicalTapeSet *lts)
Definition: logtape.c:378

References LogicalTapeSet::enable_prealloc, ltsGetFreeBlock(), and ltsGetPreallocBlock().

Referenced by LogicalTapeWrite().

◆ ltsGetFreeBlock()

static long ltsGetFreeBlock ( LogicalTapeSet lts)
static

Definition at line 378 of file logtape.c.

379 {
380  long *heap = lts->freeBlocks;
381  long blocknum;
382  int heapsize;
383  long holeval;
384  unsigned long holepos;
385 
386  /* freelist empty; allocate a new block */
387  if (lts->nFreeBlocks == 0)
388  return lts->nBlocksAllocated++;
389 
390  /* easy if heap contains one element */
391  if (lts->nFreeBlocks == 1)
392  {
393  lts->nFreeBlocks--;
394  return lts->freeBlocks[0];
395  }
396 
397  /* remove top of minheap */
398  blocknum = heap[0];
399 
400  /* we'll replace it with end of minheap array */
401  holeval = heap[--lts->nFreeBlocks];
402 
403  /* sift down */
404  holepos = 0; /* holepos is where the "hole" is */
405  heapsize = lts->nFreeBlocks;
406  while (true)
407  {
408  unsigned long left = left_offset(holepos);
409  unsigned long right = right_offset(holepos);
410  unsigned long min_child;
411 
412  if (left < heapsize && right < heapsize)
413  min_child = (heap[left] < heap[right]) ? left : right;
414  else if (left < heapsize)
415  min_child = left;
416  else if (right < heapsize)
417  min_child = right;
418  else
419  break;
420 
421  if (heap[min_child] >= holeval)
422  break;
423 
424  heap[holepos] = heap[min_child];
425  holepos = min_child;
426  }
427  heap[holepos] = holeval;
428 
429  return blocknum;
430 }
static unsigned long right_offset(unsigned long i)
Definition: logtape.c:350
static unsigned long left_offset(unsigned long i)
Definition: logtape.c:344

References LogicalTapeSet::freeBlocks, left_offset(), LogicalTapeSet::nBlocksAllocated, LogicalTapeSet::nFreeBlocks, and right_offset().

Referenced by ltsGetBlock(), and ltsGetPreallocBlock().

◆ ltsGetPreallocBlock()

static long ltsGetPreallocBlock ( LogicalTapeSet lts,
LogicalTape lt 
)
static

Definition at line 438 of file logtape.c.

439 {
440  /* sorted in descending order, so return the last element */
441  if (lt->nprealloc > 0)
442  return lt->prealloc[--lt->nprealloc];
443 
444  if (lt->prealloc == NULL)
445  {
447  lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
448  }
449  else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
450  {
451  /* when the preallocation list runs out, double the size */
452  lt->prealloc_size *= 2;
455  lt->prealloc = (long *) repalloc(lt->prealloc,
456  sizeof(long) * lt->prealloc_size);
457  }
458 
459  /* refill preallocation list */
460  lt->nprealloc = lt->prealloc_size;
461  for (int i = lt->nprealloc; i > 0; i--)
462  {
463  lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
464 
465  /* verify descending order */
466  Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
467  }
468 
469  return lt->prealloc[--lt->nprealloc];
470 }
#define TAPE_WRITE_PREALLOC_MIN
Definition: logtape.c:124
#define TAPE_WRITE_PREALLOC_MAX
Definition: logtape.c:125
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1188

References Assert(), i, ltsGetFreeBlock(), LogicalTape::nprealloc, palloc(), LogicalTape::prealloc, LogicalTape::prealloc_size, repalloc(), TAPE_WRITE_PREALLOC_MAX, and TAPE_WRITE_PREALLOC_MIN.

Referenced by ltsGetBlock().

◆ ltsInitReadBuffer()

static void ltsInitReadBuffer ( LogicalTape lt)
static

Definition at line 529 of file logtape.c.

530 {
531  Assert(lt->buffer_size > 0);
532  lt->buffer = palloc(lt->buffer_size);
533 
534  /* Read the first block, or reset if tape is empty */
536  lt->pos = 0;
537  lt->nbytes = 0;
538  ltsReadFillBuffer(lt);
539 }

References Assert(), LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::firstBlockNumber, ltsReadFillBuffer(), LogicalTape::nbytes, LogicalTape::nextBlockNumber, palloc(), and LogicalTape::pos.

Referenced by LogicalTapeBackspace(), LogicalTapeRead(), LogicalTapeSeek(), and LogicalTapeTell().

◆ ltsReadBlock()

static void ltsReadBlock ( LogicalTapeSet lts,
long  blocknum,
void *  buffer 
)
static

Definition at line 282 of file logtape.c.

283 {
284  size_t nread;
285 
286  if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
287  ereport(ERROR,
289  errmsg("could not seek to block %ld of temporary file",
290  blocknum)));
291  nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
292  if (nread != BLCKSZ)
293  ereport(ERROR,
295  errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
296  blocknum, nread, (size_t) BLCKSZ)));
297 }
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:582
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:800
int errcode_for_file_access(void)
Definition: elog.c:716
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ereport(elevel,...)
Definition: elog.h:143

References BufFileRead(), BufFileSeekBlock(), ereport, errcode_for_file_access(), errmsg(), ERROR, and LogicalTapeSet::pfile.

Referenced by LogicalTapeBackspace(), LogicalTapeFreeze(), LogicalTapeSeek(), and ltsReadFillBuffer().

◆ ltsReadFillBuffer()

static bool ltsReadFillBuffer ( LogicalTape lt)
static

Definition at line 305 of file logtape.c.

306 {
307  lt->pos = 0;
308  lt->nbytes = 0;
309 
310  do
311  {
312  char *thisbuf = lt->buffer + lt->nbytes;
313  long datablocknum = lt->nextBlockNumber;
314 
315  /* Fetch next block number */
316  if (datablocknum == -1L)
317  break; /* EOF */
318  /* Apply worker offset, needed for leader tapesets */
319  datablocknum += lt->offsetBlockNumber;
320 
321  /* Read the block */
322  ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
323  if (!lt->frozen)
324  ltsReleaseBlock(lt->tapeSet, datablocknum);
326 
327  lt->nbytes += TapeBlockGetNBytes(thisbuf);
328  if (TapeBlockIsLast(thisbuf))
329  {
330  lt->nextBlockNumber = -1L;
331  /* EOF */
332  break;
333  }
334  else
335  lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
336 
337  /* Advance to next block, if we have buffer space left */
338  } while (lt->buffer_size - lt->nbytes > BLCKSZ);
339 
340  return (lt->nbytes > 0);
341 }

References LogicalTape::buffer, LogicalTape::buffer_size, LogicalTape::curBlockNumber, LogicalTape::frozen, ltsReadBlock(), ltsReleaseBlock(), LogicalTape::nbytes, LogicalTape::nextBlockNumber, LogicalTape::offsetBlockNumber, LogicalTape::pos, TapeBlockGetNBytes, TapeBlockGetTrailer, TapeBlockIsLast, and LogicalTape::tapeSet.

Referenced by LogicalTapeRead(), and ltsInitReadBuffer().

◆ ltsReleaseBlock()

static void ltsReleaseBlock ( LogicalTapeSet lts,
long  blocknum 
)
static

Definition at line 476 of file logtape.c.

477 {
478  long *heap;
479  unsigned long holepos;
480 
481  /*
482  * Do nothing if we're no longer interested in remembering free space.
483  */
484  if (lts->forgetFreeSpace)
485  return;
486 
487  /*
488  * Enlarge freeBlocks array if full.
489  */
490  if (lts->nFreeBlocks >= lts->freeBlocksLen)
491  {
492  /*
493  * If the freelist becomes very large, just return and leak this free
494  * block.
495  */
496  if (lts->freeBlocksLen * 2 * sizeof(long) > MaxAllocSize)
497  return;
498 
499  lts->freeBlocksLen *= 2;
500  lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
501  lts->freeBlocksLen * sizeof(long));
502  }
503 
504  /* create a "hole" at end of minheap array */
505  heap = lts->freeBlocks;
506  holepos = lts->nFreeBlocks;
507  lts->nFreeBlocks++;
508 
509  /* sift up to insert blocknum */
510  while (holepos != 0)
511  {
512  unsigned long parent = parent_offset(holepos);
513 
514  if (heap[parent] < blocknum)
515  break;
516 
517  heap[holepos] = heap[parent];
518  holepos = parent;
519  }
520  heap[holepos] = blocknum;
521 }
static unsigned long parent_offset(unsigned long i)
Definition: logtape.c:356

References LogicalTapeSet::forgetFreeSpace, LogicalTapeSet::freeBlocks, LogicalTapeSet::freeBlocksLen, MaxAllocSize, LogicalTapeSet::nFreeBlocks, parent_offset(), and repalloc().

Referenced by LogicalTapeRewindForRead(), and ltsReadFillBuffer().

◆ ltsWriteBlock()

static void ltsWriteBlock ( LogicalTapeSet lts,
long  blocknum,
void *  buffer 
)
static

Definition at line 238 of file logtape.c.

239 {
240  /*
241  * BufFile does not support "holes", so if we're about to write a block
242  * that's past the current end of file, fill the space between the current
243  * end of file and the target block with zeros.
244  *
245  * This can happen either when tapes preallocate blocks; or for the last
246  * block of a tape which might not have been flushed.
247  *
248  * Note that BufFile concatenation can leave "holes" in BufFile between
249  * worker-owned block ranges. These are tracked for reporting purposes
250  * only. We never read from nor write to these hole blocks, and so they
251  * are not considered here.
252  */
253  while (blocknum > lts->nBlocksWritten)
254  {
255  PGAlignedBlock zerobuf;
256 
257  MemSet(zerobuf.data, 0, sizeof(zerobuf));
258 
259  ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
260  }
261 
262  /* Write the requested block */
263  if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
264  ereport(ERROR,
266  errmsg("could not seek to block %ld of temporary file",
267  blocknum)));
268  BufFileWrite(lts->pfile, buffer, BLCKSZ);
269 
270  /* Update nBlocksWritten, if we extended the file */
271  if (blocknum == lts->nBlocksWritten)
272  lts->nBlocksWritten++;
273 }
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:625
#define MemSet(start, val, len)
Definition: c.h:1008
char data[BLCKSZ]
Definition: c.h:1138

References BufFileSeekBlock(), BufFileWrite(), PGAlignedBlock::data, ereport, errcode_for_file_access(), errmsg(), ERROR, MemSet, LogicalTapeSet::nBlocksWritten, and LogicalTapeSet::pfile.

Referenced by LogicalTapeFreeze(), LogicalTapeRewindForRead(), and LogicalTapeWrite().

◆ parent_offset()

static unsigned long parent_offset ( unsigned long  i)
inlinestatic

Definition at line 356 of file logtape.c.

357 {
358  return (i - 1) / 2;
359 }

References i.

Referenced by ltsReleaseBlock().

◆ right_offset()

static unsigned long right_offset ( unsigned long  i)
inlinestatic

Definition at line 350 of file logtape.c.

351 {
352  return 2 * i + 2;
353 }

References i.

Referenced by ltsGetFreeBlock().