PostgreSQL Source Code  git master
sharedtuplestore.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sharedtuplestore.c
4  * Simple mechanism for sharing tuples between backends.
5  *
6  * This module contains a shared temporary tuple storage mechanism providing
7  * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8  * can write to a SharedTuplestore, and then multiple backends can later scan
9  * the stored tuples. Currently, the only scan type supported is a parallel
10  * scan where each backend reads an arbitrary subset of the tuples that were
11  * written.
12  *
13  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  * IDENTIFICATION
17  * src/backend/utils/sort/sharedtuplestore.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "access/htup.h"
25 #include "access/htup_details.h"
26 #include "storage/buffile.h"
27 #include "storage/lwlock.h"
28 #include "storage/sharedfileset.h"
29 #include "utils/sharedtuplestore.h"
30 
31 /*
32  * The size of chunks, in pages. This is somewhat arbitrarily set to match
33  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
34  * at approximately the same rate as it allocates new chunks of memory to
35  * insert them into.
36  */
37 #define STS_CHUNK_PAGES 4
38 #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39 #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
40 
41 /* Chunk written to disk. */
42 typedef struct SharedTuplestoreChunk
43 {
44  int ntuples; /* Number of tuples in this chunk. */
45  int overflow; /* If overflow, how many including this one? */
48 
49 /* Per-participant shared state. */
51 {
53  BlockNumber read_page; /* Page number for next read. */
54  BlockNumber npages; /* Number of pages written. */
55  bool writing; /* Used only for assertions. */
57 
58 /* The control object that lives in shared memory. */
60 {
61  int nparticipants; /* Number of participants that can write. */
62  int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
63  size_t meta_data_size; /* Size of per-tuple header. */
64  char name[NAMEDATALEN]; /* A name for this tuplestore. */
65 
66  /* Followed by per-participant shared state. */
68 };
69 
70 /* Per-participant state that lives in backend-local memory. */
72 {
73  int participant; /* My participant number. */
74  SharedTuplestore *sts; /* The shared state. */
75  SharedFileSet *fileset; /* The SharedFileSet holding files. */
76  MemoryContext context; /* Memory context for buffers. */
77 
78  /* State for reading. */
79  int read_participant; /* The current participant to read from. */
80  BufFile *read_file; /* The current file to read from. */
81  int read_ntuples_available; /* The number of tuples in chunk. */
82  int read_ntuples; /* How many tuples have we read from chunk? */
83  size_t read_bytes; /* How many bytes have we read from chunk? */
84  char *read_buffer; /* A buffer for loading tuples. */
86  BlockNumber read_next_page; /* Lowest block we'll consider reading. */
87 
88  /* State for writing. */
89  SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
90  BufFile *write_file; /* The current file to write to. */
91  BlockNumber write_page; /* The next page to write to. */
92  char *write_pointer; /* Current write pointer within chunk. */
93  char *write_end; /* One past the end of the current chunk. */
94 };
95 
96 static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
97  int participant);
98 
99 /*
100  * Return the amount of shared memory required to hold SharedTuplestore for a
101  * given number of participants.
102  */
103 size_t
104 sts_estimate(int participants)
105 {
106  return offsetof(SharedTuplestore, participants) +
107  sizeof(SharedTuplestoreParticipant) * participants;
108 }
109 
110 /*
111  * Initialize a SharedTuplestore in existing shared memory. There must be
112  * space for sts_estimate(participants) bytes. If flags includes the value
113  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
114  * eagerly (but this isn't yet implemented).
115  *
116  * Tuples that are stored may optionally carry a piece of fixed sized
117  * meta-data which will be retrieved along with the tuple. This is useful for
118  * the hash values used in multi-batch hash joins, but could have other
119  * applications.
120  *
121  * The caller must supply a SharedFileSet, which is essentially a directory
122  * that will be cleaned up automatically, and a name which must be unique
123  * across all SharedTuplestores created in the same SharedFileSet.
124  */
126 sts_initialize(SharedTuplestore *sts, int participants,
127  int my_participant_number,
128  size_t meta_data_size,
129  int flags,
130  SharedFileSet *fileset,
131  const char *name)
132 {
133  SharedTuplestoreAccessor *accessor;
134  int i;
135 
136  Assert(my_participant_number < participants);
137 
138  sts->nparticipants = participants;
139  sts->meta_data_size = meta_data_size;
140  sts->flags = flags;
141 
142  if (strlen(name) > sizeof(sts->name) - 1)
143  elog(ERROR, "SharedTuplestore name too long");
144  strcpy(sts->name, name);
145 
146  /*
147  * Limit meta-data so it + tuple size always fits into a single chunk.
148  * sts_puttuple() and sts_read_tuple() could be made to support scenarios
149  * where that's not the case, but it's not currently required. If so,
150  * meta-data size probably should be made variable, too.
151  */
152  if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
153  elog(ERROR, "meta-data too long");
154 
155  for (i = 0; i < participants; ++i)
156  {
159  sts->participants[i].read_page = 0;
160  sts->participants[i].npages = 0;
161  sts->participants[i].writing = false;
162  }
163 
164  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
165  accessor->participant = my_participant_number;
166  accessor->sts = sts;
167  accessor->fileset = fileset;
168  accessor->context = CurrentMemoryContext;
169 
170  return accessor;
171 }
172 
173 /*
174  * Attach to a SharedTuplestore that has been initialized by another backend,
175  * so that this backend can read and write tuples.
176  */
179  int my_participant_number,
180  SharedFileSet *fileset)
181 {
182  SharedTuplestoreAccessor *accessor;
183 
184  Assert(my_participant_number < sts->nparticipants);
185 
186  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
187  accessor->participant = my_participant_number;
188  accessor->sts = sts;
189  accessor->fileset = fileset;
190  accessor->context = CurrentMemoryContext;
191 
192  return accessor;
193 }
194 
195 static void
197 {
198  size_t size;
199 
200  size = STS_CHUNK_PAGES * BLCKSZ;
201  BufFileWrite(accessor->write_file, accessor->write_chunk, size);
202  memset(accessor->write_chunk, 0, size);
203  accessor->write_pointer = &accessor->write_chunk->data[0];
204  accessor->sts->participants[accessor->participant].npages +=
206 }
207 
208 /*
209  * Finish writing tuples. This must be called by all backends that have
210  * written data before any backend begins reading it.
211  */
212 void
214 {
215  if (accessor->write_file != NULL)
216  {
217  sts_flush_chunk(accessor);
218  BufFileClose(accessor->write_file);
219  pfree(accessor->write_chunk);
220  accessor->write_chunk = NULL;
221  accessor->write_file = NULL;
222  accessor->sts->participants[accessor->participant].writing = false;
223  }
224 }
225 
226 /*
227  * Prepare to rescan. Only one participant must call this. After it returns,
228  * all participants may call sts_begin_parallel_scan() and then loop over
229  * sts_parallel_scan_next(). This function must not be called concurrently
230  * with a scan, and synchronization to avoid that is the caller's
231  * responsibility.
232  */
233 void
235 {
236  int i;
237 
238  /*
239  * Reset the shared read head for all participants' files. Also set the
240  * initial chunk size to the minimum (any increases from that size will be
241  * recorded in chunk_expansion_log).
242  */
243  for (i = 0; i < accessor->sts->nparticipants; ++i)
244  {
245  accessor->sts->participants[i].read_page = 0;
246  }
247 }
248 
249 /*
250  * Begin scanning the contents in parallel.
251  */
252 void
254 {
256 
257  /* End any existing scan that was in progress. */
258  sts_end_parallel_scan(accessor);
259 
260  /*
261  * Any backend that might have written into this shared tuplestore must
262  * have called sts_end_write(), so that all buffers are flushed and the
263  * files have stopped growing.
264  */
265  for (i = 0; i < accessor->sts->nparticipants; ++i)
266  Assert(!accessor->sts->participants[i].writing);
267 
268  /*
269  * We will start out reading the file that THIS backend wrote. There may
270  * be some caching locality advantage to that.
271  */
272  accessor->read_participant = accessor->participant;
273  accessor->read_file = NULL;
274  accessor->read_next_page = 0;
275 }
276 
277 /*
278  * Finish a parallel scan, freeing associated backend-local resources.
279  */
280 void
282 {
283  /*
284  * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
285  * we'd probably need a reference count of current parallel scanners so we
286  * could safely do it only when the reference count reaches zero.
287  */
288  if (accessor->read_file != NULL)
289  {
290  BufFileClose(accessor->read_file);
291  accessor->read_file = NULL;
292  }
293 }
294 
295 /*
296  * Write a tuple. If a meta-data size was provided to sts_initialize, then a
297  * pointer to meta data of that size must be provided.
298  */
299 void
300 sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
301  MinimalTuple tuple)
302 {
303  size_t size;
304 
305  /* Do we have our own file yet? */
306  if (accessor->write_file == NULL)
307  {
308  SharedTuplestoreParticipant *participant;
309  char name[MAXPGPATH];
310  MemoryContext oldcxt;
311 
312  /* Create one. Only this backend will write into it. */
313  sts_filename(name, accessor, accessor->participant);
314 
315  oldcxt = MemoryContextSwitchTo(accessor->context);
316  accessor->write_file =
317  BufFileCreateFileSet(&accessor->fileset->fs, name);
318  MemoryContextSwitchTo(oldcxt);
319 
320  /* Set up the shared state for this backend's file. */
321  participant = &accessor->sts->participants[accessor->participant];
322  participant->writing = true; /* for assertions only */
323  }
324 
325  /* Do we have space? */
326  size = accessor->sts->meta_data_size + tuple->t_len;
327  if (accessor->write_pointer + size > accessor->write_end)
328  {
329  if (accessor->write_chunk == NULL)
330  {
331  /* First time through. Allocate chunk. */
332  accessor->write_chunk = (SharedTuplestoreChunk *)
334  STS_CHUNK_PAGES * BLCKSZ);
335  accessor->write_chunk->ntuples = 0;
336  accessor->write_pointer = &accessor->write_chunk->data[0];
337  accessor->write_end = (char *)
338  accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
339  }
340  else
341  {
342  /* See if flushing helps. */
343  sts_flush_chunk(accessor);
344  }
345 
346  /* It may still not be enough in the case of a gigantic tuple. */
347  if (accessor->write_pointer + size > accessor->write_end)
348  {
349  size_t written;
350 
351  /*
352  * We'll write the beginning of the oversized tuple, and then
353  * write the rest in some number of 'overflow' chunks.
354  *
355  * sts_initialize() verifies that the size of the tuple +
356  * meta-data always fits into a chunk. Because the chunk has been
357  * flushed above, we can be sure to have all of a chunk's usable
358  * space available.
359  */
360  Assert(accessor->write_pointer + accessor->sts->meta_data_size +
361  sizeof(uint32) < accessor->write_end);
362 
363  /* Write the meta-data as one chunk. */
364  if (accessor->sts->meta_data_size > 0)
365  memcpy(accessor->write_pointer, meta_data,
366  accessor->sts->meta_data_size);
367 
368  /*
369  * Write as much of the tuple as we can fit. This includes the
370  * tuple's size at the start.
371  */
372  written = accessor->write_end - accessor->write_pointer -
373  accessor->sts->meta_data_size;
374  memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
375  tuple, written);
376  ++accessor->write_chunk->ntuples;
377  size -= accessor->sts->meta_data_size;
378  size -= written;
379  /* Now write as many overflow chunks as we need for the rest. */
380  while (size > 0)
381  {
382  size_t written_this_chunk;
383 
384  sts_flush_chunk(accessor);
385 
386  /*
387  * How many overflow chunks to go? This will allow readers to
388  * skip all of them at once instead of reading each one.
389  */
390  accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
392  written_this_chunk =
393  Min(accessor->write_end - accessor->write_pointer, size);
394  memcpy(accessor->write_pointer, (char *) tuple + written,
395  written_this_chunk);
396  accessor->write_pointer += written_this_chunk;
397  size -= written_this_chunk;
398  written += written_this_chunk;
399  }
400  return;
401  }
402  }
403 
404  /* Copy meta-data and tuple into buffer. */
405  if (accessor->sts->meta_data_size > 0)
406  memcpy(accessor->write_pointer, meta_data,
407  accessor->sts->meta_data_size);
408  memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
409  tuple->t_len);
410  accessor->write_pointer += size;
411  ++accessor->write_chunk->ntuples;
412 }
413 
414 static MinimalTuple
415 sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
416 {
417  MinimalTuple tuple;
418  uint32 size;
419  size_t remaining_size;
420  size_t this_chunk_size;
421  char *destination;
422 
423  /*
424  * We'll keep track of bytes read from this chunk so that we can detect an
425  * overflowing tuple and switch to reading overflow pages.
426  */
427  if (accessor->sts->meta_data_size > 0)
428  {
429  BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
430  accessor->read_bytes += accessor->sts->meta_data_size;
431  }
432  BufFileReadExact(accessor->read_file, &size, sizeof(size));
433  accessor->read_bytes += sizeof(size);
434  if (size > accessor->read_buffer_size)
435  {
436  size_t new_read_buffer_size;
437 
438  if (accessor->read_buffer != NULL)
439  pfree(accessor->read_buffer);
440  new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
441  accessor->read_buffer =
442  MemoryContextAlloc(accessor->context, new_read_buffer_size);
443  accessor->read_buffer_size = new_read_buffer_size;
444  }
445  remaining_size = size - sizeof(uint32);
446  this_chunk_size = Min(remaining_size,
447  BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
448  destination = accessor->read_buffer + sizeof(uint32);
449  BufFileReadExact(accessor->read_file, destination, this_chunk_size);
450  accessor->read_bytes += this_chunk_size;
451  remaining_size -= this_chunk_size;
452  destination += this_chunk_size;
453  ++accessor->read_ntuples;
454 
455  /* Check if we need to read any overflow chunks. */
456  while (remaining_size > 0)
457  {
458  /* We are now positioned at the start of an overflow chunk. */
459  SharedTuplestoreChunk chunk_header;
460 
461  BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
462  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
463  if (chunk_header.overflow == 0)
464  ereport(ERROR,
466  errmsg("unexpected chunk in shared tuplestore temporary file"),
467  errdetail_internal("Expected overflow chunk.")));
468  accessor->read_next_page += STS_CHUNK_PAGES;
469  this_chunk_size = Min(remaining_size,
470  BLCKSZ * STS_CHUNK_PAGES -
472  BufFileReadExact(accessor->read_file, destination, this_chunk_size);
473  accessor->read_bytes += this_chunk_size;
474  remaining_size -= this_chunk_size;
475  destination += this_chunk_size;
476 
477  /*
478  * These will be used to count regular tuples following the oversized
479  * tuple that spilled into this overflow chunk.
480  */
481  accessor->read_ntuples = 0;
482  accessor->read_ntuples_available = chunk_header.ntuples;
483  }
484 
485  tuple = (MinimalTuple) accessor->read_buffer;
486  tuple->t_len = size;
487 
488  return tuple;
489 }
490 
491 /*
492  * Get the next tuple in the current parallel scan.
493  */
496 {
498  BlockNumber read_page;
499  bool eof;
500 
501  for (;;)
502  {
503  /* Can we read more tuples from the current chunk? */
504  if (accessor->read_ntuples < accessor->read_ntuples_available)
505  return sts_read_tuple(accessor, meta_data);
506 
507  /* Find the location of a new chunk to read. */
508  p = &accessor->sts->participants[accessor->read_participant];
509 
511  /* We can skip directly past overflow pages we know about. */
512  if (p->read_page < accessor->read_next_page)
513  p->read_page = accessor->read_next_page;
514  eof = p->read_page >= p->npages;
515  if (!eof)
516  {
517  /* Claim the next chunk. */
518  read_page = p->read_page;
519  /* Advance the read head for the next reader. */
521  accessor->read_next_page = p->read_page;
522  }
523  LWLockRelease(&p->lock);
524 
525  if (!eof)
526  {
527  SharedTuplestoreChunk chunk_header;
528 
529  /* Make sure we have the file open. */
530  if (accessor->read_file == NULL)
531  {
532  char name[MAXPGPATH];
533  MemoryContext oldcxt;
534 
535  sts_filename(name, accessor, accessor->read_participant);
536 
537  oldcxt = MemoryContextSwitchTo(accessor->context);
538  accessor->read_file =
539  BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
540  false);
541  MemoryContextSwitchTo(oldcxt);
542  }
543 
544  /* Seek and load the chunk header. */
545  if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
546  ereport(ERROR,
548  errmsg("could not seek to block %u in shared tuplestore temporary file",
549  read_page)));
550  BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
551 
552  /*
553  * If this is an overflow chunk, we skip it and any following
554  * overflow chunks all at once.
555  */
556  if (chunk_header.overflow > 0)
557  {
558  accessor->read_next_page = read_page +
559  chunk_header.overflow * STS_CHUNK_PAGES;
560  continue;
561  }
562 
563  accessor->read_ntuples = 0;
564  accessor->read_ntuples_available = chunk_header.ntuples;
565  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
566 
567  /* Go around again, so we can get a tuple from this chunk. */
568  }
569  else
570  {
571  if (accessor->read_file != NULL)
572  {
573  BufFileClose(accessor->read_file);
574  accessor->read_file = NULL;
575  }
576 
577  /*
578  * Try the next participant's file. If we've gone full circle,
579  * we're done.
580  */
581  accessor->read_participant = (accessor->read_participant + 1) %
582  accessor->sts->nparticipants;
583  if (accessor->read_participant == accessor->participant)
584  break;
585  accessor->read_next_page = 0;
586 
587  /* Go around again, so we can get a chunk from this file. */
588  }
589  }
590 
591  return NULL;
592 }
593 
594 /*
595  * Create the name used for the BufFile that a given participant will write.
596  */
597 static void
598 sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
599 {
600  snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
601 }
uint32 BlockNumber
Definition: block.h:31
int BufFileSeekBlock(BufFile *file, int64 blknum)
Definition: buffile.c:851
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
void BufFileClose(BufFile *file)
Definition: buffile.c:412
unsigned int uint32
Definition: c.h:506
#define Min(x, y)
Definition: c.h:1004
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:182
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
MinimalTupleData * MinimalTuple
Definition: htup.h:27
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:707
@ LWTRANCHE_SHARED_TUPLESTORE
Definition: lwlock.h:199
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
#define NAMEDATALEN
#define MAXPGPATH
#define snprintf
Definition: port.h:238
MemoryContextSwitchTo(old_ctx)
void sts_reinitialize(SharedTuplestoreAccessor *accessor)
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
#define STS_CHUNK_HEADER_SIZE
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
struct SharedTuplestoreChunk SharedTuplestoreChunk
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
#define STS_CHUNK_PAGES
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
size_t sts_estimate(int participants)
#define STS_CHUNK_DATA_SIZE
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
static pg_noinline void Size size
Definition: slab.c:607
Definition: lwlock.h:42
SharedTuplestore * sts
SharedTuplestoreChunk * write_chunk
char data[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
char name[NAMEDATALEN]
const char * name