PostgreSQL Source Code  git master
sharedtuplestore.c File Reference
#include "postgres.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "miscadmin.h"
#include "storage/buffile.h"
#include "storage/lwlock.h"
#include "storage/sharedfileset.h"
#include "utils/sharedtuplestore.h"
Include dependency graph for sharedtuplestore.c:

Go to the source code of this file.

Data Structures

struct  SharedTuplestoreChunk
 
struct  SharedTuplestoreParticipant
 
struct  SharedTuplestore
 
struct  SharedTuplestoreAccessor
 

Macros

#define STS_CHUNK_PAGES   4
 
#define STS_CHUNK_HEADER_SIZE   offsetof(SharedTuplestoreChunk, data)
 
#define STS_CHUNK_DATA_SIZE   (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
 

Typedefs

typedef struct SharedTuplestoreChunk SharedTuplestoreChunk
 
typedef struct SharedTuplestoreParticipant SharedTuplestoreParticipant
 

Functions

static void sts_filename (char *name, SharedTuplestoreAccessor *accessor, int participant)
 
size_t sts_estimate (int participants)
 
SharedTuplestoreAccessorsts_initialize (SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
 
SharedTuplestoreAccessorsts_attach (SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
 
static void sts_flush_chunk (SharedTuplestoreAccessor *accessor)
 
void sts_end_write (SharedTuplestoreAccessor *accessor)
 
void sts_reinitialize (SharedTuplestoreAccessor *accessor)
 
void sts_begin_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_end_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_puttuple (SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
 
static MinimalTuple sts_read_tuple (SharedTuplestoreAccessor *accessor, void *meta_data)
 
MinimalTuple sts_parallel_scan_next (SharedTuplestoreAccessor *accessor, void *meta_data)
 

Macro Definition Documentation

◆ STS_CHUNK_DATA_SIZE

#define STS_CHUNK_DATA_SIZE   (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)

Definition at line 40 of file sharedtuplestore.c.

Referenced by sts_initialize(), and sts_puttuple().

◆ STS_CHUNK_HEADER_SIZE

#define STS_CHUNK_HEADER_SIZE   offsetof(SharedTuplestoreChunk, data)

Definition at line 39 of file sharedtuplestore.c.

Referenced by sts_parallel_scan_next(), and sts_read_tuple().

◆ STS_CHUNK_PAGES

#define STS_CHUNK_PAGES   4

Typedef Documentation

◆ SharedTuplestoreChunk

◆ SharedTuplestoreParticipant

Function Documentation

◆ sts_attach()

SharedTuplestoreAccessor* sts_attach ( SharedTuplestore sts,
int  my_participant_number,
SharedFileSet fileset 
)

Definition at line 178 of file sharedtuplestore.c.

References Assert, SharedTuplestoreAccessor::context, CurrentMemoryContext, SharedTuplestoreAccessor::fileset, palloc0(), SharedTuplestoreAccessor::participant, and SharedTuplestoreAccessor::sts.

Referenced by ExecParallelHashEnsureBatchAccessors(), and ExecParallelHashRepartitionRest().

181 {
182  SharedTuplestoreAccessor *accessor;
183 
184  Assert(my_participant_number < sts->nparticipants);
185 
186  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
187  accessor->participant = my_participant_number;
188  accessor->sts = sts;
189  accessor->fileset = fileset;
190  accessor->context = CurrentMemoryContext;
191 
192  return accessor;
193 }
SharedTuplestore * sts
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
void * palloc0(Size size)
Definition: mcxt.c:980
#define Assert(condition)
Definition: c.h:739

◆ sts_begin_parallel_scan()

void sts_begin_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 258 of file sharedtuplestore.c.

References Assert, i, SharedTuplestore::nparticipants, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, PG_USED_FOR_ASSERTS_ONLY, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, sts_end_parallel_scan(), and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinNewBatch(), and ExecParallelHashRepartitionRest().

259 {
261 
262  /* End any existing scan that was in progress. */
263  sts_end_parallel_scan(accessor);
264 
265  /*
266  * Any backend that might have written into this shared tuplestore must
267  * have called sts_end_write(), so that all buffers are flushed and the
268  * files have stopped growing.
269  */
270  for (i = 0; i < accessor->sts->nparticipants; ++i)
271  Assert(!accessor->sts->participants[i].writing);
272 
273  /*
274  * We will start out reading the file that THIS backend wrote. There may
275  * be some caching locality advantage to that.
276  */
277  accessor->read_participant = accessor->participant;
278  accessor->read_file = NULL;
279  accessor->read_next_page = 0;
280 }
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
#define Assert(condition)
Definition: c.h:739
int i
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:123

◆ sts_end_parallel_scan()

void sts_end_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 286 of file sharedtuplestore.c.

References BufFileClose(), and SharedTuplestoreAccessor::read_file.

Referenced by ExecHashTableDetach(), ExecHashTableDetachBatch(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinNewBatch(), ExecParallelHashRepartitionRest(), and sts_begin_parallel_scan().

287 {
288  /*
289  * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
290  * we'd probably need a reference count of current parallel scanners so we
291  * could safely do it only when the reference count reaches zero.
292  */
293  if (accessor->read_file != NULL)
294  {
295  BufFileClose(accessor->read_file);
296  accessor->read_file = NULL;
297  }
298 }
void BufFileClose(BufFile *file)
Definition: buffile.c:391

◆ sts_end_write()

void sts_end_write ( SharedTuplestoreAccessor accessor)

Definition at line 218 of file sharedtuplestore.c.

References BufFileClose(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, pfree(), SharedTuplestoreAccessor::sts, sts_flush_chunk(), SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreParticipant::writing.

Referenced by ExecHashTableDetach(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinPartitionOuter(), and MultiExecParallelHash().

219 {
220  if (accessor->write_file != NULL)
221  {
222  sts_flush_chunk(accessor);
223  BufFileClose(accessor->write_file);
224  pfree(accessor->write_chunk);
225  accessor->write_chunk = NULL;
226  accessor->write_file = NULL;
227  accessor->sts->participants[accessor->participant].writing = false;
228  }
229 }
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
void BufFileClose(BufFile *file)
Definition: buffile.c:391
void pfree(void *pointer)
Definition: mcxt.c:1056
SharedTuplestoreChunk * write_chunk

◆ sts_estimate()

size_t sts_estimate ( int  participants)

Definition at line 105 of file sharedtuplestore.c.

References offsetof.

106 {
107  return offsetof(SharedTuplestore, participants) +
108  sizeof(SharedTuplestoreParticipant) * participants;
109 }
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
#define offsetof(type, field)
Definition: c.h:662

◆ sts_filename()

static void sts_filename ( char *  name,
SharedTuplestoreAccessor accessor,
int  participant 
)
static

Definition at line 628 of file sharedtuplestore.c.

References MAXPGPATH, SharedTuplestore::name, snprintf, and SharedTuplestoreAccessor::sts.

Referenced by sts_parallel_scan_next(), and sts_puttuple().

629 {
630  snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
631 }
char name[NAMEDATALEN]
SharedTuplestore * sts
#define MAXPGPATH
const char * name
Definition: encode.c:521
#define snprintf
Definition: port.h:192

◆ sts_flush_chunk()

static void sts_flush_chunk ( SharedTuplestoreAccessor accessor)
static

Definition at line 196 of file sharedtuplestore.c.

References BufFileWrite(), SharedTuplestoreChunk::data, ereport, errcode_for_file_access(), errmsg(), ERROR, SharedTuplestoreParticipant::npages, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_PAGES, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreAccessor::write_pointer.

Referenced by sts_end_write(), and sts_puttuple().

197 {
198  size_t size;
199  size_t written;
200 
201  size = STS_CHUNK_PAGES * BLCKSZ;
202  written = BufFileWrite(accessor->write_file, accessor->write_chunk, size);
203  if (written != size)
204  ereport(ERROR,
206  errmsg("could not write to temporary file: %m")));
207  memset(accessor->write_chunk, 0, size);
208  accessor->write_pointer = &accessor->write_chunk->data[0];
209  accessor->sts->participants[accessor->participant].npages +=
211 }
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
#define ERROR
Definition: elog.h:43
int errcode_for_file_access(void)
Definition: elog.c:631
#define ereport(elevel, rest)
Definition: elog.h:141
SharedTuplestoreChunk * write_chunk
#define STS_CHUNK_PAGES
int errmsg(const char *fmt,...)
Definition: elog.c:822
size_t BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:575
char data[FLEXIBLE_ARRAY_MEMBER]

◆ sts_initialize()

SharedTuplestoreAccessor* sts_initialize ( SharedTuplestore sts,
int  participants,
int  my_participant_number,
size_t  meta_data_size,
int  flags,
SharedFileSet fileset,
const char *  name 
)

Definition at line 127 of file sharedtuplestore.c.

References Assert, SharedTuplestoreAccessor::context, CurrentMemoryContext, elog, ERROR, SharedTuplestoreAccessor::fileset, SharedTuplestore::flags, i, SharedTuplestoreParticipant::lock, LWLockInitialize(), LWTRANCHE_SHARED_TUPLESTORE, SharedTuplestore::meta_data_size, SharedTuplestore::name, SharedTuplestore::nparticipants, palloc0(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinSetUpBatches().

133 {
134  SharedTuplestoreAccessor *accessor;
135  int i;
136 
137  Assert(my_participant_number < participants);
138 
139  sts->nparticipants = participants;
140  sts->meta_data_size = meta_data_size;
141  sts->flags = flags;
142 
143  if (strlen(name) > sizeof(sts->name) - 1)
144  elog(ERROR, "SharedTuplestore name too long");
145  strcpy(sts->name, name);
146 
147  /*
148  * Limit meta-data so it + tuple size always fits into a single chunk.
149  * sts_puttuple() and sts_read_tuple() could be made to support scenarios
150  * where that's not the case, but it's not currently required. If so,
151  * meta-data size probably should be made variable, too.
152  */
153  if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
154  elog(ERROR, "meta-data too long");
155 
156  for (i = 0; i < participants; ++i)
157  {
160  sts->participants[i].read_page = 0;
161  sts->participants[i].writing = false;
162  }
163 
164  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
165  accessor->participant = my_participant_number;
166  accessor->sts = sts;
167  accessor->fileset = fileset;
168  accessor->context = CurrentMemoryContext;
169 
170  return accessor;
171 }
char name[NAMEDATALEN]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
#define ERROR
Definition: elog.h:43
unsigned int uint32
Definition: c.h:359
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:678
void * palloc0(Size size)
Definition: mcxt.c:980
#define Assert(condition)
Definition: c.h:739
#define STS_CHUNK_DATA_SIZE
const char * name
Definition: encode.c:521
#define elog(elevel,...)
Definition: elog.h:228
int i

◆ sts_parallel_scan_next()

MinimalTuple sts_parallel_scan_next ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)

Definition at line 525 of file sharedtuplestore.c.

References BufFileClose(), BufFileOpenShared(), BufFileRead(), BufFileSeekBlock(), ereport, errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, SharedTuplestoreAccessor::fileset, SharedTuplestoreParticipant::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, name, SharedTuplestoreParticipant::npages, SharedTuplestore::nparticipants, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, sts_filename(), and sts_read_tuple().

Referenced by ExecParallelHashJoinNewBatch(), ExecParallelHashJoinOuterGetTuple(), and ExecParallelHashRepartitionRest().

526 {
528  BlockNumber read_page;
529  bool eof;
530 
531  for (;;)
532  {
533  /* Can we read more tuples from the current chunk? */
534  if (accessor->read_ntuples < accessor->read_ntuples_available)
535  return sts_read_tuple(accessor, meta_data);
536 
537  /* Find the location of a new chunk to read. */
538  p = &accessor->sts->participants[accessor->read_participant];
539 
541  /* We can skip directly past overflow pages we know about. */
542  if (p->read_page < accessor->read_next_page)
543  p->read_page = accessor->read_next_page;
544  eof = p->read_page >= p->npages;
545  if (!eof)
546  {
547  /* Claim the next chunk. */
548  read_page = p->read_page;
549  /* Advance the read head for the next reader. */
551  accessor->read_next_page = p->read_page;
552  }
553  LWLockRelease(&p->lock);
554 
555  if (!eof)
556  {
557  SharedTuplestoreChunk chunk_header;
558 
559  /* Make sure we have the file open. */
560  if (accessor->read_file == NULL)
561  {
562  char name[MAXPGPATH];
563 
564  sts_filename(name, accessor, accessor->read_participant);
565  accessor->read_file =
566  BufFileOpenShared(accessor->fileset, name);
567  }
568 
569  /* Seek and load the chunk header. */
570  if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
571  ereport(ERROR,
573  errmsg("could not read from shared tuplestore temporary file"),
574  errdetail_internal("Could not seek to next block.")));
575  if (BufFileRead(accessor->read_file, &chunk_header,
577  ereport(ERROR,
579  errmsg("could not read from shared tuplestore temporary file"),
580  errdetail_internal("Short read while reading chunk header.")));
581 
582  /*
583  * If this is an overflow chunk, we skip it and any following
584  * overflow chunks all at once.
585  */
586  if (chunk_header.overflow > 0)
587  {
588  accessor->read_next_page = read_page +
589  chunk_header.overflow * STS_CHUNK_PAGES;
590  continue;
591  }
592 
593  accessor->read_ntuples = 0;
594  accessor->read_ntuples_available = chunk_header.ntuples;
595  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
596 
597  /* Go around again, so we can get a tuple from this chunk. */
598  }
599  else
600  {
601  if (accessor->read_file != NULL)
602  {
603  BufFileClose(accessor->read_file);
604  accessor->read_file = NULL;
605  }
606 
607  /*
608  * Try the next participant's file. If we've gone full circle,
609  * we're done.
610  */
611  accessor->read_participant = (accessor->read_participant + 1) %
612  accessor->sts->nparticipants;
613  if (accessor->read_participant == accessor->participant)
614  break;
615  accessor->read_next_page = 0;
616 
617  /* Go around again, so we can get a chunk from this file. */
618  }
619  }
620 
621  return NULL;
622 }
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:280
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
uint32 BlockNumber
Definition: block.h:31
SharedTuplestore * sts
void BufFileClose(BufFile *file)
Definition: buffile.c:391
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define STS_CHUNK_HEADER_SIZE
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:631
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
#define ereport(elevel, rest)
Definition: elog.h:141
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:752
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
const char * name
Definition: encode.c:521
#define STS_CHUNK_PAGES
int errmsg(const char *fmt,...)
Definition: elog.c:822
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:528

◆ sts_puttuple()

void sts_puttuple ( SharedTuplestoreAccessor accessor,
void *  meta_data,
MinimalTuple  tuple 
)

Definition at line 305 of file sharedtuplestore.c.

References Assert, BufFileCreateShared(), SharedTuplestoreAccessor::context, SharedTuplestoreChunk::data, SharedTuplestoreAccessor::fileset, MAXPGPATH, MemoryContextAllocZero(), SharedTuplestore::meta_data_size, Min, name, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, STS_CHUNK_PAGES, sts_filename(), sts_flush_chunk(), MinimalTupleData::t_len, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_end, SharedTuplestoreAccessor::write_file, SharedTuplestoreAccessor::write_pointer, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), and ExecParallelHashTableInsert().

307 {
308  size_t size;
309 
310  /* Do we have our own file yet? */
311  if (accessor->write_file == NULL)
312  {
313  SharedTuplestoreParticipant *participant;
314  char name[MAXPGPATH];
315 
316  /* Create one. Only this backend will write into it. */
317  sts_filename(name, accessor, accessor->participant);
318  accessor->write_file = BufFileCreateShared(accessor->fileset, name);
319 
320  /* Set up the shared state for this backend's file. */
321  participant = &accessor->sts->participants[accessor->participant];
322  participant->writing = true; /* for assertions only */
323  }
324 
325  /* Do we have space? */
326  size = accessor->sts->meta_data_size + tuple->t_len;
327  if (accessor->write_pointer + size >= accessor->write_end)
328  {
329  if (accessor->write_chunk == NULL)
330  {
331  /* First time through. Allocate chunk. */
332  accessor->write_chunk = (SharedTuplestoreChunk *)
334  STS_CHUNK_PAGES * BLCKSZ);
335  accessor->write_chunk->ntuples = 0;
336  accessor->write_pointer = &accessor->write_chunk->data[0];
337  accessor->write_end = (char *)
338  accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
339  }
340  else
341  {
342  /* See if flushing helps. */
343  sts_flush_chunk(accessor);
344  }
345 
346  /* It may still not be enough in the case of a gigantic tuple. */
347  if (accessor->write_pointer + size >= accessor->write_end)
348  {
349  size_t written;
350 
351  /*
352  * We'll write the beginning of the oversized tuple, and then
353  * write the rest in some number of 'overflow' chunks.
354  *
355  * sts_initialize() verifies that the size of the tuple +
356  * meta-data always fits into a chunk. Because the chunk has been
357  * flushed above, we can be sure to have all of a chunk's usable
358  * space available.
359  */
360  Assert(accessor->write_pointer + accessor->sts->meta_data_size +
361  sizeof(uint32) < accessor->write_end);
362 
363  /* Write the meta-data as one chunk. */
364  if (accessor->sts->meta_data_size > 0)
365  memcpy(accessor->write_pointer, meta_data,
366  accessor->sts->meta_data_size);
367 
368  /*
369  * Write as much of the tuple as we can fit. This includes the
370  * tuple's size at the start.
371  */
372  written = accessor->write_end - accessor->write_pointer -
373  accessor->sts->meta_data_size;
374  memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
375  tuple, written);
376  ++accessor->write_chunk->ntuples;
377  size -= accessor->sts->meta_data_size;
378  size -= written;
379  /* Now write as many overflow chunks as we need for the rest. */
380  while (size > 0)
381  {
382  size_t written_this_chunk;
383 
384  sts_flush_chunk(accessor);
385 
386  /*
387  * How many overflow chunks to go? This will allow readers to
388  * skip all of them at once instead of reading each one.
389  */
390  accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
392  written_this_chunk =
393  Min(accessor->write_end - accessor->write_pointer, size);
394  memcpy(accessor->write_pointer, (char *) tuple + written,
395  written_this_chunk);
396  accessor->write_pointer += written_this_chunk;
397  size -= written_this_chunk;
398  written += written_this_chunk;
399  }
400  return;
401  }
402  }
403 
404  /* Copy meta-data and tuple into buffer. */
405  if (accessor->sts->meta_data_size > 0)
406  memcpy(accessor->write_pointer, meta_data,
407  accessor->sts->meta_data_size);
408  memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
409  tuple->t_len);
410  accessor->write_pointer += size;
411  ++accessor->write_chunk->ntuples;
412 }
#define Min(x, y)
Definition: c.h:911
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
SharedTuplestore * sts
#define MAXPGPATH
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:258
unsigned int uint32
Definition: c.h:359
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
#define Assert(condition)
Definition: c.h:739
#define STS_CHUNK_DATA_SIZE
SharedTuplestoreChunk * write_chunk
const char * name
Definition: encode.c:521
#define STS_CHUNK_PAGES
char data[FLEXIBLE_ARRAY_MEMBER]

◆ sts_read_tuple()

static MinimalTuple sts_read_tuple ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)
static

Definition at line 415 of file sharedtuplestore.c.

References BufFileRead(), SharedTuplestoreAccessor::context, ereport, errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, Max, MemoryContextAlloc(), SharedTuplestore::meta_data_size, Min, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, pfree(), SharedTuplestoreAccessor::read_buffer, SharedTuplestoreAccessor::read_buffer_size, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, and MinimalTupleData::t_len.

Referenced by sts_parallel_scan_next().

416 {
417  MinimalTuple tuple;
418  uint32 size;
419  size_t remaining_size;
420  size_t this_chunk_size;
421  char *destination;
422 
423  /*
424  * We'll keep track of bytes read from this chunk so that we can detect an
425  * overflowing tuple and switch to reading overflow pages.
426  */
427  if (accessor->sts->meta_data_size > 0)
428  {
429  if (BufFileRead(accessor->read_file,
430  meta_data,
431  accessor->sts->meta_data_size) !=
432  accessor->sts->meta_data_size)
433  ereport(ERROR,
435  errmsg("could not read from shared tuplestore temporary file"),
436  errdetail_internal("Short read while reading meta-data.")));
437  accessor->read_bytes += accessor->sts->meta_data_size;
438  }
439  if (BufFileRead(accessor->read_file,
440  &size,
441  sizeof(size)) != sizeof(size))
442  ereport(ERROR,
444  errmsg("could not read from shared tuplestore temporary file"),
445  errdetail_internal("Short read while reading size.")));
446  accessor->read_bytes += sizeof(size);
447  if (size > accessor->read_buffer_size)
448  {
449  size_t new_read_buffer_size;
450 
451  if (accessor->read_buffer != NULL)
452  pfree(accessor->read_buffer);
453  new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
454  accessor->read_buffer =
455  MemoryContextAlloc(accessor->context, new_read_buffer_size);
456  accessor->read_buffer_size = new_read_buffer_size;
457  }
458  remaining_size = size - sizeof(uint32);
459  this_chunk_size = Min(remaining_size,
460  BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
461  destination = accessor->read_buffer + sizeof(uint32);
462  if (BufFileRead(accessor->read_file,
463  destination,
464  this_chunk_size) != this_chunk_size)
465  ereport(ERROR,
467  errmsg("could not read from shared tuplestore temporary file"),
468  errdetail_internal("Short read while reading tuple.")));
469  accessor->read_bytes += this_chunk_size;
470  remaining_size -= this_chunk_size;
471  destination += this_chunk_size;
472  ++accessor->read_ntuples;
473 
474  /* Check if we need to read any overflow chunks. */
475  while (remaining_size > 0)
476  {
477  /* We are now positioned at the start of an overflow chunk. */
478  SharedTuplestoreChunk chunk_header;
479 
480  if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
482  ereport(ERROR,
484  errmsg("could not read from shared tuplestore temporary file"),
485  errdetail_internal("Short read while reading overflow chunk header.")));
486  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
487  if (chunk_header.overflow == 0)
488  ereport(ERROR,
490  errmsg("unexpected chunk in shared tuplestore temporary file"),
491  errdetail_internal("Expected overflow chunk.")));
492  accessor->read_next_page += STS_CHUNK_PAGES;
493  this_chunk_size = Min(remaining_size,
494  BLCKSZ * STS_CHUNK_PAGES -
496  if (BufFileRead(accessor->read_file,
497  destination,
498  this_chunk_size) != this_chunk_size)
499  ereport(ERROR,
501  errmsg("could not read from shared tuplestore temporary file"),
502  errdetail_internal("Short read while reading tuple.")));
503  accessor->read_bytes += this_chunk_size;
504  remaining_size -= this_chunk_size;
505  destination += this_chunk_size;
506 
507  /*
508  * These will be used to count regular tuples following the oversized
509  * tuple that spilled into this overflow chunk.
510  */
511  accessor->read_ntuples = 0;
512  accessor->read_ntuples_available = chunk_header.ntuples;
513  }
514 
515  tuple = (MinimalTuple) accessor->read_buffer;
516  tuple->t_len = size;
517 
518  return tuple;
519 }
#define Min(x, y)
Definition: c.h:911
SharedTuplestore * sts
int errdetail_internal(const char *fmt,...)
Definition: elog.c:982
void pfree(void *pointer)
Definition: mcxt.c:1056
#define STS_CHUNK_HEADER_SIZE
#define ERROR
Definition: elog.h:43
MinimalTupleData * MinimalTuple
Definition: htup.h:27
int errcode_for_file_access(void)
Definition: elog.c:631
unsigned int uint32
Definition: c.h:359
#define ereport(elevel, rest)
Definition: elog.h:141
#define Max(x, y)
Definition: c.h:905
#define STS_CHUNK_PAGES
int errmsg(const char *fmt,...)
Definition: elog.c:822
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:528

◆ sts_reinitialize()

void sts_reinitialize ( SharedTuplestoreAccessor accessor)

Definition at line 239 of file sharedtuplestore.c.

References i, SharedTuplestore::nparticipants, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, and SharedTuplestoreAccessor::sts.

240 {
241  int i;
242 
243  /*
244  * Reset the shared read head for all participants' files. Also set the
245  * initial chunk size to the minimum (any increases from that size will be
246  * recorded in chunk_expansion_log).
247  */
248  for (i = 0; i < accessor->sts->nparticipants; ++i)
249  {
250  accessor->sts->participants[i].read_page = 0;
251  }
252 }
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
int i