PostgreSQL Source Code  git master
sharedtuplestore.c File Reference
#include "postgres.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "miscadmin.h"
#include "storage/buffile.h"
#include "storage/lwlock.h"
#include "storage/sharedfileset.h"
#include "utils/sharedtuplestore.h"
Include dependency graph for sharedtuplestore.c:

Go to the source code of this file.

Data Structures

struct  SharedTuplestoreChunk
 
struct  SharedTuplestoreParticipant
 
struct  SharedTuplestore
 
struct  SharedTuplestoreAccessor
 

Macros

#define STS_CHUNK_PAGES   4
 
#define STS_CHUNK_HEADER_SIZE   offsetof(SharedTuplestoreChunk, data)
 
#define STS_CHUNK_DATA_SIZE   (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
 

Typedefs

typedef struct SharedTuplestoreChunk SharedTuplestoreChunk
 
typedef struct SharedTuplestoreParticipant SharedTuplestoreParticipant
 

Functions

static void sts_filename (char *name, SharedTuplestoreAccessor *accessor, int participant)
 
size_t sts_estimate (int participants)
 
SharedTuplestoreAccessorsts_initialize (SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
 
SharedTuplestoreAccessorsts_attach (SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
 
static void sts_flush_chunk (SharedTuplestoreAccessor *accessor)
 
void sts_end_write (SharedTuplestoreAccessor *accessor)
 
void sts_reinitialize (SharedTuplestoreAccessor *accessor)
 
void sts_begin_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_end_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_puttuple (SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
 
static MinimalTuple sts_read_tuple (SharedTuplestoreAccessor *accessor, void *meta_data)
 
MinimalTuple sts_parallel_scan_next (SharedTuplestoreAccessor *accessor, void *meta_data)
 

Macro Definition Documentation

◆ STS_CHUNK_DATA_SIZE

#define STS_CHUNK_DATA_SIZE   (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)

Definition at line 40 of file sharedtuplestore.c.

◆ STS_CHUNK_HEADER_SIZE

#define STS_CHUNK_HEADER_SIZE   offsetof(SharedTuplestoreChunk, data)

Definition at line 39 of file sharedtuplestore.c.

◆ STS_CHUNK_PAGES

#define STS_CHUNK_PAGES   4

Definition at line 38 of file sharedtuplestore.c.

Typedef Documentation

◆ SharedTuplestoreChunk

◆ SharedTuplestoreParticipant

Function Documentation

◆ sts_attach()

SharedTuplestoreAccessor* sts_attach ( SharedTuplestore sts,
int  my_participant_number,
SharedFileSet fileset 
)

Definition at line 179 of file sharedtuplestore.c.

182 {
183  SharedTuplestoreAccessor *accessor;
184 
185  Assert(my_participant_number < sts->nparticipants);
186 
187  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
188  accessor->participant = my_participant_number;
189  accessor->sts = sts;
190  accessor->fileset = fileset;
191  accessor->context = CurrentMemoryContext;
192 
193  return accessor;
194 }
Assert(fmt[strlen(fmt) - 1] !='\n')
void * palloc0(Size size)
Definition: mcxt.c:1257
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
SharedTuplestore * sts

References Assert(), SharedTuplestoreAccessor::context, CurrentMemoryContext, SharedTuplestoreAccessor::fileset, palloc0(), SharedTuplestoreAccessor::participant, and SharedTuplestoreAccessor::sts.

Referenced by ExecParallelHashEnsureBatchAccessors(), and ExecParallelHashRepartitionRest().

◆ sts_begin_parallel_scan()

void sts_begin_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 254 of file sharedtuplestore.c.

255 {
257 
258  /* End any existing scan that was in progress. */
259  sts_end_parallel_scan(accessor);
260 
261  /*
262  * Any backend that might have written into this shared tuplestore must
263  * have called sts_end_write(), so that all buffers are flushed and the
264  * files have stopped growing.
265  */
266  for (i = 0; i < accessor->sts->nparticipants; ++i)
267  Assert(!accessor->sts->participants[i].writing);
268 
269  /*
270  * We will start out reading the file that THIS backend wrote. There may
271  * be some caching locality advantage to that.
272  */
273  accessor->read_participant = accessor->participant;
274  accessor->read_file = NULL;
275  accessor->read_next_page = 0;
276 }
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:171
int i
Definition: isn.c:73
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]

References Assert(), i, SharedTuplestore::nparticipants, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, PG_USED_FOR_ASSERTS_ONLY, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, sts_end_parallel_scan(), and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinNewBatch(), and ExecParallelHashRepartitionRest().

◆ sts_end_parallel_scan()

void sts_end_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 282 of file sharedtuplestore.c.

283 {
284  /*
285  * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
286  * we'd probably need a reference count of current parallel scanners so we
287  * could safely do it only when the reference count reaches zero.
288  */
289  if (accessor->read_file != NULL)
290  {
291  BufFileClose(accessor->read_file);
292  accessor->read_file = NULL;
293  }
294 }
void BufFileClose(BufFile *file)
Definition: buffile.c:412

References BufFileClose(), and SharedTuplestoreAccessor::read_file.

Referenced by ExecHashTableDetach(), ExecHashTableDetachBatch(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinNewBatch(), ExecParallelHashRepartitionRest(), ExecParallelPrepHashTableForUnmatched(), and sts_begin_parallel_scan().

◆ sts_end_write()

void sts_end_write ( SharedTuplestoreAccessor accessor)

Definition at line 214 of file sharedtuplestore.c.

215 {
216  if (accessor->write_file != NULL)
217  {
218  sts_flush_chunk(accessor);
219  BufFileClose(accessor->write_file);
220  pfree(accessor->write_chunk);
221  accessor->write_chunk = NULL;
222  accessor->write_file = NULL;
223  accessor->sts->participants[accessor->participant].writing = false;
224  }
225 }
void pfree(void *pointer)
Definition: mcxt.c:1456
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreChunk * write_chunk

References BufFileClose(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, pfree(), SharedTuplestoreAccessor::sts, sts_flush_chunk(), SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreParticipant::writing.

Referenced by ExecHashTableDetach(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinPartitionOuter(), and MultiExecParallelHash().

◆ sts_estimate()

size_t sts_estimate ( int  participants)

Definition at line 105 of file sharedtuplestore.c.

106 {
107  return offsetof(SharedTuplestore, participants) +
108  sizeof(SharedTuplestoreParticipant) * participants;
109 }
struct SharedTuplestoreParticipant SharedTuplestoreParticipant

◆ sts_filename()

static void sts_filename ( char *  name,
SharedTuplestoreAccessor accessor,
int  participant 
)
static

Definition at line 599 of file sharedtuplestore.c.

600 {
601  snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
602 }
#define MAXPGPATH
#define snprintf
Definition: port.h:238
char name[NAMEDATALEN]
const char * name

References MAXPGPATH, name, SharedTuplestore::name, snprintf, and SharedTuplestoreAccessor::sts.

Referenced by sts_parallel_scan_next(), and sts_puttuple().

◆ sts_flush_chunk()

static void sts_flush_chunk ( SharedTuplestoreAccessor accessor)
static

Definition at line 197 of file sharedtuplestore.c.

198 {
199  size_t size;
200 
201  size = STS_CHUNK_PAGES * BLCKSZ;
202  BufFileWrite(accessor->write_file, accessor->write_chunk, size);
203  memset(accessor->write_chunk, 0, size);
204  accessor->write_pointer = &accessor->write_chunk->data[0];
205  accessor->sts->participants[accessor->participant].npages +=
207 }
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
#define STS_CHUNK_PAGES
char data[FLEXIBLE_ARRAY_MEMBER]

References BufFileWrite(), SharedTuplestoreChunk::data, SharedTuplestoreParticipant::npages, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_PAGES, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreAccessor::write_pointer.

Referenced by sts_end_write(), and sts_puttuple().

◆ sts_initialize()

SharedTuplestoreAccessor* sts_initialize ( SharedTuplestore sts,
int  participants,
int  my_participant_number,
size_t  meta_data_size,
int  flags,
SharedFileSet fileset,
const char *  name 
)

Definition at line 127 of file sharedtuplestore.c.

133 {
134  SharedTuplestoreAccessor *accessor;
135  int i;
136 
137  Assert(my_participant_number < participants);
138 
139  sts->nparticipants = participants;
140  sts->meta_data_size = meta_data_size;
141  sts->flags = flags;
142 
143  if (strlen(name) > sizeof(sts->name) - 1)
144  elog(ERROR, "SharedTuplestore name too long");
145  strcpy(sts->name, name);
146 
147  /*
148  * Limit meta-data so it + tuple size always fits into a single chunk.
149  * sts_puttuple() and sts_read_tuple() could be made to support scenarios
150  * where that's not the case, but it's not currently required. If so,
151  * meta-data size probably should be made variable, too.
152  */
153  if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
154  elog(ERROR, "meta-data too long");
155 
156  for (i = 0; i < participants; ++i)
157  {
160  sts->participants[i].read_page = 0;
161  sts->participants[i].npages = 0;
162  sts->participants[i].writing = false;
163  }
164 
165  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
166  accessor->participant = my_participant_number;
167  accessor->sts = sts;
168  accessor->fileset = fileset;
169  accessor->context = CurrentMemoryContext;
170 
171  return accessor;
172 }
unsigned int uint32
Definition: c.h:495
#define ERROR
Definition: elog.h:39
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_SHARED_TUPLESTORE
Definition: lwlock.h:201
#define STS_CHUNK_DATA_SIZE

References Assert(), SharedTuplestoreAccessor::context, CurrentMemoryContext, elog(), ERROR, SharedTuplestoreAccessor::fileset, SharedTuplestore::flags, i, SharedTuplestoreParticipant::lock, LWLockInitialize(), LWTRANCHE_SHARED_TUPLESTORE, SharedTuplestore::meta_data_size, name, SharedTuplestore::name, SharedTuplestoreParticipant::npages, SharedTuplestore::nparticipants, palloc0(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinSetUpBatches().

◆ sts_parallel_scan_next()

MinimalTuple sts_parallel_scan_next ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)

Definition at line 496 of file sharedtuplestore.c.

497 {
499  BlockNumber read_page;
500  bool eof;
501 
502  for (;;)
503  {
504  /* Can we read more tuples from the current chunk? */
505  if (accessor->read_ntuples < accessor->read_ntuples_available)
506  return sts_read_tuple(accessor, meta_data);
507 
508  /* Find the location of a new chunk to read. */
509  p = &accessor->sts->participants[accessor->read_participant];
510 
512  /* We can skip directly past overflow pages we know about. */
513  if (p->read_page < accessor->read_next_page)
514  p->read_page = accessor->read_next_page;
515  eof = p->read_page >= p->npages;
516  if (!eof)
517  {
518  /* Claim the next chunk. */
519  read_page = p->read_page;
520  /* Advance the read head for the next reader. */
522  accessor->read_next_page = p->read_page;
523  }
524  LWLockRelease(&p->lock);
525 
526  if (!eof)
527  {
528  SharedTuplestoreChunk chunk_header;
529 
530  /* Make sure we have the file open. */
531  if (accessor->read_file == NULL)
532  {
533  char name[MAXPGPATH];
534  MemoryContext oldcxt;
535 
536  sts_filename(name, accessor, accessor->read_participant);
537 
538  oldcxt = MemoryContextSwitchTo(accessor->context);
539  accessor->read_file =
540  BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
541  false);
542  MemoryContextSwitchTo(oldcxt);
543  }
544 
545  /* Seek and load the chunk header. */
546  if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
547  ereport(ERROR,
549  errmsg("could not seek to block %u in shared tuplestore temporary file",
550  read_page)));
551  BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
552 
553  /*
554  * If this is an overflow chunk, we skip it and any following
555  * overflow chunks all at once.
556  */
557  if (chunk_header.overflow > 0)
558  {
559  accessor->read_next_page = read_page +
560  chunk_header.overflow * STS_CHUNK_PAGES;
561  continue;
562  }
563 
564  accessor->read_ntuples = 0;
565  accessor->read_ntuples_available = chunk_header.ntuples;
566  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
567 
568  /* Go around again, so we can get a tuple from this chunk. */
569  }
570  else
571  {
572  if (accessor->read_file != NULL)
573  {
574  BufFileClose(accessor->read_file);
575  accessor->read_file = NULL;
576  }
577 
578  /*
579  * Try the next participant's file. If we've gone full circle,
580  * we're done.
581  */
582  accessor->read_participant = (accessor->read_participant + 1) %
583  accessor->sts->nparticipants;
584  if (accessor->read_participant == accessor->participant)
585  break;
586  accessor->read_next_page = 0;
587 
588  /* Go around again, so we can get a chunk from this file. */
589  }
590  }
591 
592  return NULL;
593 }
uint32 BlockNumber
Definition: block.h:31
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:851
int errcode_for_file_access(void)
Definition: elog.c:881
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ereport(elevel,...)
Definition: elog.h:149
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LW_EXCLUSIVE
Definition: lwlock.h:116
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
#define STS_CHUNK_HEADER_SIZE
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)

References BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), BufFileSeekBlock(), SharedTuplestoreAccessor::context, ereport, errcode_for_file_access(), errmsg(), ERROR, SharedTuplestoreAccessor::fileset, SharedFileSet::fs, if(), SharedTuplestoreParticipant::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, MemoryContextSwitchTo(), name, SharedTuplestoreParticipant::npages, SharedTuplestore::nparticipants, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, sts_filename(), and sts_read_tuple().

Referenced by ExecParallelHashJoinNewBatch(), ExecParallelHashJoinOuterGetTuple(), and ExecParallelHashRepartitionRest().

◆ sts_puttuple()

void sts_puttuple ( SharedTuplestoreAccessor accessor,
void *  meta_data,
MinimalTuple  tuple 
)

Definition at line 301 of file sharedtuplestore.c.

303 {
304  size_t size;
305 
306  /* Do we have our own file yet? */
307  if (accessor->write_file == NULL)
308  {
309  SharedTuplestoreParticipant *participant;
310  char name[MAXPGPATH];
311  MemoryContext oldcxt;
312 
313  /* Create one. Only this backend will write into it. */
314  sts_filename(name, accessor, accessor->participant);
315 
316  oldcxt = MemoryContextSwitchTo(accessor->context);
317  accessor->write_file =
318  BufFileCreateFileSet(&accessor->fileset->fs, name);
319  MemoryContextSwitchTo(oldcxt);
320 
321  /* Set up the shared state for this backend's file. */
322  participant = &accessor->sts->participants[accessor->participant];
323  participant->writing = true; /* for assertions only */
324  }
325 
326  /* Do we have space? */
327  size = accessor->sts->meta_data_size + tuple->t_len;
328  if (accessor->write_pointer + size > accessor->write_end)
329  {
330  if (accessor->write_chunk == NULL)
331  {
332  /* First time through. Allocate chunk. */
333  accessor->write_chunk = (SharedTuplestoreChunk *)
335  STS_CHUNK_PAGES * BLCKSZ);
336  accessor->write_chunk->ntuples = 0;
337  accessor->write_pointer = &accessor->write_chunk->data[0];
338  accessor->write_end = (char *)
339  accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
340  }
341  else
342  {
343  /* See if flushing helps. */
344  sts_flush_chunk(accessor);
345  }
346 
347  /* It may still not be enough in the case of a gigantic tuple. */
348  if (accessor->write_pointer + size > accessor->write_end)
349  {
350  size_t written;
351 
352  /*
353  * We'll write the beginning of the oversized tuple, and then
354  * write the rest in some number of 'overflow' chunks.
355  *
356  * sts_initialize() verifies that the size of the tuple +
357  * meta-data always fits into a chunk. Because the chunk has been
358  * flushed above, we can be sure to have all of a chunk's usable
359  * space available.
360  */
361  Assert(accessor->write_pointer + accessor->sts->meta_data_size +
362  sizeof(uint32) < accessor->write_end);
363 
364  /* Write the meta-data as one chunk. */
365  if (accessor->sts->meta_data_size > 0)
366  memcpy(accessor->write_pointer, meta_data,
367  accessor->sts->meta_data_size);
368 
369  /*
370  * Write as much of the tuple as we can fit. This includes the
371  * tuple's size at the start.
372  */
373  written = accessor->write_end - accessor->write_pointer -
374  accessor->sts->meta_data_size;
375  memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
376  tuple, written);
377  ++accessor->write_chunk->ntuples;
378  size -= accessor->sts->meta_data_size;
379  size -= written;
380  /* Now write as many overflow chunks as we need for the rest. */
381  while (size > 0)
382  {
383  size_t written_this_chunk;
384 
385  sts_flush_chunk(accessor);
386 
387  /*
388  * How many overflow chunks to go? This will allow readers to
389  * skip all of them at once instead of reading each one.
390  */
391  accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
393  written_this_chunk =
394  Min(accessor->write_end - accessor->write_pointer, size);
395  memcpy(accessor->write_pointer, (char *) tuple + written,
396  written_this_chunk);
397  accessor->write_pointer += written_this_chunk;
398  size -= written_this_chunk;
399  written += written_this_chunk;
400  }
401  return;
402  }
403  }
404 
405  /* Copy meta-data and tuple into buffer. */
406  if (accessor->sts->meta_data_size > 0)
407  memcpy(accessor->write_pointer, meta_data,
408  accessor->sts->meta_data_size);
409  memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
410  tuple->t_len);
411  accessor->write_pointer += size;
412  ++accessor->write_chunk->ntuples;
413 }
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
#define Min(x, y)
Definition: c.h:993
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1064

References Assert(), BufFileCreateFileSet(), SharedTuplestoreAccessor::context, SharedTuplestoreChunk::data, SharedTuplestoreAccessor::fileset, SharedFileSet::fs, MAXPGPATH, MemoryContextAllocZero(), MemoryContextSwitchTo(), SharedTuplestore::meta_data_size, Min, name, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, STS_CHUNK_PAGES, sts_filename(), sts_flush_chunk(), MinimalTupleData::t_len, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_end, SharedTuplestoreAccessor::write_file, SharedTuplestoreAccessor::write_pointer, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), and ExecParallelHashTableInsert().

◆ sts_read_tuple()

static MinimalTuple sts_read_tuple ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)
static

Definition at line 416 of file sharedtuplestore.c.

417 {
418  MinimalTuple tuple;
419  uint32 size;
420  size_t remaining_size;
421  size_t this_chunk_size;
422  char *destination;
423 
424  /*
425  * We'll keep track of bytes read from this chunk so that we can detect an
426  * overflowing tuple and switch to reading overflow pages.
427  */
428  if (accessor->sts->meta_data_size > 0)
429  {
430  BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
431  accessor->read_bytes += accessor->sts->meta_data_size;
432  }
433  BufFileReadExact(accessor->read_file, &size, sizeof(size));
434  accessor->read_bytes += sizeof(size);
435  if (size > accessor->read_buffer_size)
436  {
437  size_t new_read_buffer_size;
438 
439  if (accessor->read_buffer != NULL)
440  pfree(accessor->read_buffer);
441  new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
442  accessor->read_buffer =
443  MemoryContextAlloc(accessor->context, new_read_buffer_size);
444  accessor->read_buffer_size = new_read_buffer_size;
445  }
446  remaining_size = size - sizeof(uint32);
447  this_chunk_size = Min(remaining_size,
448  BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
449  destination = accessor->read_buffer + sizeof(uint32);
450  BufFileReadExact(accessor->read_file, destination, this_chunk_size);
451  accessor->read_bytes += this_chunk_size;
452  remaining_size -= this_chunk_size;
453  destination += this_chunk_size;
454  ++accessor->read_ntuples;
455 
456  /* Check if we need to read any overflow chunks. */
457  while (remaining_size > 0)
458  {
459  /* We are now positioned at the start of an overflow chunk. */
460  SharedTuplestoreChunk chunk_header;
461 
462  BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
463  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
464  if (chunk_header.overflow == 0)
465  ereport(ERROR,
467  errmsg("unexpected chunk in shared tuplestore temporary file"),
468  errdetail_internal("Expected overflow chunk.")));
469  accessor->read_next_page += STS_CHUNK_PAGES;
470  this_chunk_size = Min(remaining_size,
471  BLCKSZ * STS_CHUNK_PAGES -
473  BufFileReadExact(accessor->read_file, destination, this_chunk_size);
474  accessor->read_bytes += this_chunk_size;
475  remaining_size -= this_chunk_size;
476  destination += this_chunk_size;
477 
478  /*
479  * These will be used to count regular tuples following the oversized
480  * tuple that spilled into this overflow chunk.
481  */
482  accessor->read_ntuples = 0;
483  accessor->read_ntuples_available = chunk_header.ntuples;
484  }
485 
486  tuple = (MinimalTuple) accessor->read_buffer;
487  tuple->t_len = size;
488 
489  return tuple;
490 }
#define Max(x, y)
Definition: c.h:987
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
MinimalTupleData * MinimalTuple
Definition: htup.h:27
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1021

References BufFileReadExact(), SharedTuplestoreAccessor::context, ereport, errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, Max, MemoryContextAlloc(), SharedTuplestore::meta_data_size, Min, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, pfree(), SharedTuplestoreAccessor::read_buffer, SharedTuplestoreAccessor::read_buffer_size, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, and MinimalTupleData::t_len.

Referenced by sts_parallel_scan_next().

◆ sts_reinitialize()

void sts_reinitialize ( SharedTuplestoreAccessor accessor)

Definition at line 235 of file sharedtuplestore.c.

236 {
237  int i;
238 
239  /*
240  * Reset the shared read head for all participants' files. Also set the
241  * initial chunk size to the minimum (any increases from that size will be
242  * recorded in chunk_expansion_log).
243  */
244  for (i = 0; i < accessor->sts->nparticipants; ++i)
245  {
246  accessor->sts->participants[i].read_page = 0;
247  }
248 }

References i, SharedTuplestore::nparticipants, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, and SharedTuplestoreAccessor::sts.