PostgreSQL Source Code  git master
sharedtuplestore.h File Reference
#include "access/htup.h"
#include "storage/fd.h"
#include "storage/sharedfileset.h"
Include dependency graph for sharedtuplestore.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define SHARED_TUPLESTORE_SINGLE_PASS   0x01
 

Typedefs

typedef struct SharedTuplestore SharedTuplestore
 
typedef struct SharedTuplestoreAccessor SharedTuplestoreAccessor
 

Functions

size_t sts_estimate (int participants)
 
SharedTuplestoreAccessorsts_initialize (SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
 
SharedTuplestoreAccessorsts_attach (SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
 
void sts_end_write (SharedTuplestoreAccessor *accessor)
 
void sts_reinitialize (SharedTuplestoreAccessor *accessor)
 
void sts_begin_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_end_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_puttuple (SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
 
MinimalTuple sts_parallel_scan_next (SharedTuplestoreAccessor *accessor, void *meta_data)
 

Macro Definition Documentation

◆ SHARED_TUPLESTORE_SINGLE_PASS

#define SHARED_TUPLESTORE_SINGLE_PASS   0x01

Definition at line 30 of file sharedtuplestore.h.

Referenced by ExecParallelHashJoinSetUpBatches().

Typedef Documentation

◆ SharedTuplestore

Definition at line 21 of file sharedtuplestore.h.

◆ SharedTuplestoreAccessor

Definition at line 24 of file sharedtuplestore.h.

Function Documentation

◆ sts_attach()

SharedTuplestoreAccessor* sts_attach ( SharedTuplestore sts,
int  my_participant_number,
SharedFileSet fileset 
)

Definition at line 178 of file sharedtuplestore.c.

References Assert, SharedTuplestoreAccessor::context, CurrentMemoryContext, SharedTuplestoreAccessor::fileset, palloc0(), SharedTuplestoreAccessor::participant, and SharedTuplestoreAccessor::sts.

Referenced by ExecParallelHashEnsureBatchAccessors(), and ExecParallelHashRepartitionRest().

181 {
182  SharedTuplestoreAccessor *accessor;
183 
184  Assert(my_participant_number < sts->nparticipants);
185 
186  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
187  accessor->participant = my_participant_number;
188  accessor->sts = sts;
189  accessor->fileset = fileset;
190  accessor->context = CurrentMemoryContext;
191 
192  return accessor;
193 }
SharedTuplestore * sts
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
void * palloc0(Size size)
Definition: mcxt.c:981
#define Assert(condition)
Definition: c.h:745

◆ sts_begin_parallel_scan()

void sts_begin_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 253 of file sharedtuplestore.c.

References Assert, i, SharedTuplestore::nparticipants, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, PG_USED_FOR_ASSERTS_ONLY, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, sts_end_parallel_scan(), and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinNewBatch(), and ExecParallelHashRepartitionRest().

254 {
256 
257  /* End any existing scan that was in progress. */
258  sts_end_parallel_scan(accessor);
259 
260  /*
261  * Any backend that might have written into this shared tuplestore must
262  * have called sts_end_write(), so that all buffers are flushed and the
263  * files have stopped growing.
264  */
265  for (i = 0; i < accessor->sts->nparticipants; ++i)
266  Assert(!accessor->sts->participants[i].writing);
267 
268  /*
269  * We will start out reading the file that THIS backend wrote. There may
270  * be some caching locality advantage to that.
271  */
272  accessor->read_participant = accessor->participant;
273  accessor->read_file = NULL;
274  accessor->read_next_page = 0;
275 }
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
#define Assert(condition)
Definition: c.h:745
int i
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:121

◆ sts_end_parallel_scan()

void sts_end_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 281 of file sharedtuplestore.c.

References BufFileClose(), and SharedTuplestoreAccessor::read_file.

Referenced by ExecHashTableDetach(), ExecHashTableDetachBatch(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinNewBatch(), ExecParallelHashRepartitionRest(), and sts_begin_parallel_scan().

282 {
283  /*
284  * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
285  * we'd probably need a reference count of current parallel scanners so we
286  * could safely do it only when the reference count reaches zero.
287  */
288  if (accessor->read_file != NULL)
289  {
290  BufFileClose(accessor->read_file);
291  accessor->read_file = NULL;
292  }
293 }
void BufFileClose(BufFile *file)
Definition: buffile.c:395

◆ sts_end_write()

void sts_end_write ( SharedTuplestoreAccessor accessor)

Definition at line 213 of file sharedtuplestore.c.

References BufFileClose(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, pfree(), SharedTuplestoreAccessor::sts, sts_flush_chunk(), SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreParticipant::writing.

Referenced by ExecHashTableDetach(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinPartitionOuter(), and MultiExecParallelHash().

214 {
215  if (accessor->write_file != NULL)
216  {
217  sts_flush_chunk(accessor);
218  BufFileClose(accessor->write_file);
219  pfree(accessor->write_chunk);
220  accessor->write_chunk = NULL;
221  accessor->write_file = NULL;
222  accessor->sts->participants[accessor->participant].writing = false;
223  }
224 }
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void pfree(void *pointer)
Definition: mcxt.c:1057
SharedTuplestoreChunk * write_chunk

◆ sts_estimate()

size_t sts_estimate ( int  participants)

Definition at line 105 of file sharedtuplestore.c.

References offsetof.

106 {
107  return offsetof(SharedTuplestore, participants) +
108  sizeof(SharedTuplestoreParticipant) * participants;
109 }
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
#define offsetof(type, field)
Definition: c.h:668

◆ sts_initialize()

SharedTuplestoreAccessor* sts_initialize ( SharedTuplestore sts,
int  participants,
int  my_participant_number,
size_t  meta_data_size,
int  flags,
SharedFileSet fileset,
const char *  name 
)

Definition at line 127 of file sharedtuplestore.c.

References Assert, SharedTuplestoreAccessor::context, CurrentMemoryContext, elog, ERROR, SharedTuplestoreAccessor::fileset, SharedTuplestore::flags, i, SharedTuplestoreParticipant::lock, LWLockInitialize(), LWTRANCHE_SHARED_TUPLESTORE, SharedTuplestore::meta_data_size, SharedTuplestore::name, SharedTuplestore::nparticipants, palloc0(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinSetUpBatches().

133 {
134  SharedTuplestoreAccessor *accessor;
135  int i;
136 
137  Assert(my_participant_number < participants);
138 
139  sts->nparticipants = participants;
140  sts->meta_data_size = meta_data_size;
141  sts->flags = flags;
142 
143  if (strlen(name) > sizeof(sts->name) - 1)
144  elog(ERROR, "SharedTuplestore name too long");
145  strcpy(sts->name, name);
146 
147  /*
148  * Limit meta-data so it + tuple size always fits into a single chunk.
149  * sts_puttuple() and sts_read_tuple() could be made to support scenarios
150  * where that's not the case, but it's not currently required. If so,
151  * meta-data size probably should be made variable, too.
152  */
153  if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
154  elog(ERROR, "meta-data too long");
155 
156  for (i = 0; i < participants; ++i)
157  {
160  sts->participants[i].read_page = 0;
161  sts->participants[i].writing = false;
162  }
163 
164  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
165  accessor->participant = my_participant_number;
166  accessor->sts = sts;
167  accessor->fileset = fileset;
168  accessor->context = CurrentMemoryContext;
169 
170  return accessor;
171 }
char name[NAMEDATALEN]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
#define ERROR
Definition: elog.h:43
unsigned int uint32
Definition: c.h:374
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:745
void * palloc0(Size size)
Definition: mcxt.c:981
#define Assert(condition)
Definition: c.h:745
#define STS_CHUNK_DATA_SIZE
const char * name
Definition: encode.c:561
#define elog(elevel,...)
Definition: elog.h:214
int i

◆ sts_parallel_scan_next()

MinimalTuple sts_parallel_scan_next ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)

Definition at line 520 of file sharedtuplestore.c.

References BufFileClose(), BufFileOpenShared(), BufFileRead(), BufFileSeekBlock(), ereport, errcode_for_file_access(), errmsg(), ERROR, SharedTuplestoreAccessor::fileset, SharedTuplestoreParticipant::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, name, SharedTuplestoreParticipant::npages, SharedTuplestore::nparticipants, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, sts_filename(), and sts_read_tuple().

Referenced by ExecParallelHashJoinNewBatch(), ExecParallelHashJoinOuterGetTuple(), and ExecParallelHashRepartitionRest().

521 {
523  BlockNumber read_page;
524  bool eof;
525 
526  for (;;)
527  {
528  /* Can we read more tuples from the current chunk? */
529  if (accessor->read_ntuples < accessor->read_ntuples_available)
530  return sts_read_tuple(accessor, meta_data);
531 
532  /* Find the location of a new chunk to read. */
533  p = &accessor->sts->participants[accessor->read_participant];
534 
536  /* We can skip directly past overflow pages we know about. */
537  if (p->read_page < accessor->read_next_page)
538  p->read_page = accessor->read_next_page;
539  eof = p->read_page >= p->npages;
540  if (!eof)
541  {
542  /* Claim the next chunk. */
543  read_page = p->read_page;
544  /* Advance the read head for the next reader. */
546  accessor->read_next_page = p->read_page;
547  }
548  LWLockRelease(&p->lock);
549 
550  if (!eof)
551  {
552  SharedTuplestoreChunk chunk_header;
553  size_t nread;
554 
555  /* Make sure we have the file open. */
556  if (accessor->read_file == NULL)
557  {
558  char name[MAXPGPATH];
559 
560  sts_filename(name, accessor, accessor->read_participant);
561  accessor->read_file =
562  BufFileOpenShared(accessor->fileset, name, O_RDONLY);
563  }
564 
565  /* Seek and load the chunk header. */
566  if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
567  ereport(ERROR,
569  errmsg("could not seek to block %u in shared tuplestore temporary file",
570  read_page)));
571  nread = BufFileRead(accessor->read_file, &chunk_header,
573  if (nread != STS_CHUNK_HEADER_SIZE)
574  ereport(ERROR,
576  errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes",
577  nread, STS_CHUNK_HEADER_SIZE)));
578 
579  /*
580  * If this is an overflow chunk, we skip it and any following
581  * overflow chunks all at once.
582  */
583  if (chunk_header.overflow > 0)
584  {
585  accessor->read_next_page = read_page +
586  chunk_header.overflow * STS_CHUNK_PAGES;
587  continue;
588  }
589 
590  accessor->read_ntuples = 0;
591  accessor->read_ntuples_available = chunk_header.ntuples;
592  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
593 
594  /* Go around again, so we can get a tuple from this chunk. */
595  }
596  else
597  {
598  if (accessor->read_file != NULL)
599  {
600  BufFileClose(accessor->read_file);
601  accessor->read_file = NULL;
602  }
603 
604  /*
605  * Try the next participant's file. If we've gone full circle,
606  * we're done.
607  */
608  accessor->read_participant = (accessor->read_participant + 1) %
609  accessor->sts->nparticipants;
610  if (accessor->read_participant == accessor->participant)
611  break;
612  accessor->read_next_page = 0;
613 
614  /* Go around again, so we can get a chunk from this file. */
615  }
616  }
617 
618  return NULL;
619 }
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
uint32 BlockNumber
Definition: block.h:31
SharedTuplestore * sts
void BufFileClose(BufFile *file)
Definition: buffile.c:395
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define STS_CHUNK_HEADER_SIZE
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
int errcode_for_file_access(void)
Definition: elog.c:633
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
BufFile * BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
Definition: buffile.c:284
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:761
#define ereport(elevel,...)
Definition: elog.h:144
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
const char * name
Definition: encode.c:561
#define STS_CHUNK_PAGES
int errmsg(const char *fmt,...)
Definition: elog.c:824
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:543

◆ sts_puttuple()

void sts_puttuple ( SharedTuplestoreAccessor accessor,
void *  meta_data,
MinimalTuple  tuple 
)

Definition at line 300 of file sharedtuplestore.c.

References Assert, BufFileCreateShared(), SharedTuplestoreAccessor::context, SharedTuplestoreChunk::data, SharedTuplestoreAccessor::fileset, MAXPGPATH, MemoryContextAllocZero(), SharedTuplestore::meta_data_size, Min, name, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, STS_CHUNK_PAGES, sts_filename(), sts_flush_chunk(), MinimalTupleData::t_len, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_end, SharedTuplestoreAccessor::write_file, SharedTuplestoreAccessor::write_pointer, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), and ExecParallelHashTableInsert().

302 {
303  size_t size;
304 
305  /* Do we have our own file yet? */
306  if (accessor->write_file == NULL)
307  {
308  SharedTuplestoreParticipant *participant;
309  char name[MAXPGPATH];
310 
311  /* Create one. Only this backend will write into it. */
312  sts_filename(name, accessor, accessor->participant);
313  accessor->write_file = BufFileCreateShared(accessor->fileset, name);
314 
315  /* Set up the shared state for this backend's file. */
316  participant = &accessor->sts->participants[accessor->participant];
317  participant->writing = true; /* for assertions only */
318  }
319 
320  /* Do we have space? */
321  size = accessor->sts->meta_data_size + tuple->t_len;
322  if (accessor->write_pointer + size >= accessor->write_end)
323  {
324  if (accessor->write_chunk == NULL)
325  {
326  /* First time through. Allocate chunk. */
327  accessor->write_chunk = (SharedTuplestoreChunk *)
329  STS_CHUNK_PAGES * BLCKSZ);
330  accessor->write_chunk->ntuples = 0;
331  accessor->write_pointer = &accessor->write_chunk->data[0];
332  accessor->write_end = (char *)
333  accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
334  }
335  else
336  {
337  /* See if flushing helps. */
338  sts_flush_chunk(accessor);
339  }
340 
341  /* It may still not be enough in the case of a gigantic tuple. */
342  if (accessor->write_pointer + size >= accessor->write_end)
343  {
344  size_t written;
345 
346  /*
347  * We'll write the beginning of the oversized tuple, and then
348  * write the rest in some number of 'overflow' chunks.
349  *
350  * sts_initialize() verifies that the size of the tuple +
351  * meta-data always fits into a chunk. Because the chunk has been
352  * flushed above, we can be sure to have all of a chunk's usable
353  * space available.
354  */
355  Assert(accessor->write_pointer + accessor->sts->meta_data_size +
356  sizeof(uint32) < accessor->write_end);
357 
358  /* Write the meta-data as one chunk. */
359  if (accessor->sts->meta_data_size > 0)
360  memcpy(accessor->write_pointer, meta_data,
361  accessor->sts->meta_data_size);
362 
363  /*
364  * Write as much of the tuple as we can fit. This includes the
365  * tuple's size at the start.
366  */
367  written = accessor->write_end - accessor->write_pointer -
368  accessor->sts->meta_data_size;
369  memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
370  tuple, written);
371  ++accessor->write_chunk->ntuples;
372  size -= accessor->sts->meta_data_size;
373  size -= written;
374  /* Now write as many overflow chunks as we need for the rest. */
375  while (size > 0)
376  {
377  size_t written_this_chunk;
378 
379  sts_flush_chunk(accessor);
380 
381  /*
382  * How many overflow chunks to go? This will allow readers to
383  * skip all of them at once instead of reading each one.
384  */
385  accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
387  written_this_chunk =
388  Min(accessor->write_end - accessor->write_pointer, size);
389  memcpy(accessor->write_pointer, (char *) tuple + written,
390  written_this_chunk);
391  accessor->write_pointer += written_this_chunk;
392  size -= written_this_chunk;
393  written += written_this_chunk;
394  }
395  return;
396  }
397  }
398 
399  /* Copy meta-data and tuple into buffer. */
400  if (accessor->sts->meta_data_size > 0)
401  memcpy(accessor->write_pointer, meta_data,
402  accessor->sts->meta_data_size);
403  memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
404  tuple->t_len);
405  accessor->write_pointer += size;
406  ++accessor->write_chunk->ntuples;
407 }
#define Min(x, y)
Definition: c.h:927
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
SharedTuplestore * sts
#define MAXPGPATH
BufFile * BufFileCreateShared(SharedFileSet *fileset, const char *name)
Definition: buffile.c:262
unsigned int uint32
Definition: c.h:374
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
#define Assert(condition)
Definition: c.h:745
#define STS_CHUNK_DATA_SIZE
SharedTuplestoreChunk * write_chunk
const char * name
Definition: encode.c:561
#define STS_CHUNK_PAGES
char data[FLEXIBLE_ARRAY_MEMBER]

◆ sts_reinitialize()

void sts_reinitialize ( SharedTuplestoreAccessor accessor)

Definition at line 234 of file sharedtuplestore.c.

References i, SharedTuplestore::nparticipants, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, and SharedTuplestoreAccessor::sts.

235 {
236  int i;
237 
238  /*
239  * Reset the shared read head for all participants' files. Also set the
240  * initial chunk size to the minimum (any increases from that size will be
241  * recorded in chunk_expansion_log).
242  */
243  for (i = 0; i < accessor->sts->nparticipants; ++i)
244  {
245  accessor->sts->participants[i].read_page = 0;
246  }
247 }
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestore * sts
int i