PostgreSQL Source Code  git master
sharedtuplestore.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * sharedtuplestore.c
4  * Simple mechanism for sharing tuples between backends.
5  *
6  * This module contains a shared temporary tuple storage mechanism providing
7  * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8  * can write to a SharedTuplestore, and then multiple backends can later scan
9  * the stored tuples. Currently, the only scan type supported is a parallel
10  * scan where each backend reads an arbitrary subset of the tuples that were
11  * written.
12  *
13  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  * IDENTIFICATION
17  * src/backend/utils/sort/sharedtuplestore.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "access/htup.h"
25 #include "access/htup_details.h"
26 #include "miscadmin.h"
27 #include "storage/buffile.h"
28 #include "storage/lwlock.h"
29 #include "storage/sharedfileset.h"
30 #include "utils/sharedtuplestore.h"
31 
32 /*
33  * The size of chunks, in pages. This is somewhat arbitrarily set to match
34  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
35  * at approximately the same rate as it allocates new chunks of memory to
36  * insert them into.
37  */
38 #define STS_CHUNK_PAGES 4
39 #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
40 #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
41 
42 /* Chunk written to disk. */
43 typedef struct SharedTuplestoreChunk
44 {
45  int ntuples; /* Number of tuples in this chunk. */
46  int overflow; /* If overflow, how many including this one? */
49 
50 /* Per-participant shared state. */
52 {
54  BlockNumber read_page; /* Page number for next read. */
55  BlockNumber npages; /* Number of pages written. */
56  bool writing; /* Used only for assertions. */
58 
59 /* The control object that lives in shared memory. */
61 {
62  int nparticipants; /* Number of participants that can write. */
63  int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
64  size_t meta_data_size; /* Size of per-tuple header. */
65  char name[NAMEDATALEN]; /* A name for this tuplestore. */
66 
67  /* Followed by per-participant shared state. */
69 };
70 
71 /* Per-participant state that lives in backend-local memory. */
73 {
74  int participant; /* My participant number. */
75  SharedTuplestore *sts; /* The shared state. */
76  SharedFileSet *fileset; /* The SharedFileSet holding files. */
77  MemoryContext context; /* Memory context for buffers. */
78 
79  /* State for reading. */
80  int read_participant; /* The current participant to read from. */
81  BufFile *read_file; /* The current file to read from. */
82  int read_ntuples_available; /* The number of tuples in chunk. */
83  int read_ntuples; /* How many tuples have we read from chunk? */
84  size_t read_bytes; /* How many bytes have we read from chunk? */
85  char *read_buffer; /* A buffer for loading tuples. */
87  BlockNumber read_next_page; /* Lowest block we'll consider reading. */
88 
89  /* State for writing. */
90  SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
91  BufFile *write_file; /* The current file to write to. */
92  BlockNumber write_page; /* The next page to write to. */
93  char *write_pointer; /* Current write pointer within chunk. */
94  char *write_end; /* One past the end of the current chunk. */
95 };
96 
97 static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
98  int participant);
99 
100 /*
101  * Return the amount of shared memory required to hold SharedTuplestore for a
102  * given number of participants.
103  */
104 size_t
105 sts_estimate(int participants)
106 {
107  return offsetof(SharedTuplestore, participants) +
108  sizeof(SharedTuplestoreParticipant) * participants;
109 }
110 
111 /*
112  * Initialize a SharedTuplestore in existing shared memory. There must be
113  * space for sts_estimate(participants) bytes. If flags includes the value
114  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
115  * eagerly (but this isn't yet implemented).
116  *
117  * Tuples that are stored may optionally carry a piece of fixed sized
118  * meta-data which will be retrieved along with the tuple. This is useful for
119  * the hash values used in multi-batch hash joins, but could have other
120  * applications.
121  *
122  * The caller must supply a SharedFileSet, which is essentially a directory
123  * that will be cleaned up automatically, and a name which must be unique
124  * across all SharedTuplestores created in the same SharedFileSet.
125  */
127 sts_initialize(SharedTuplestore *sts, int participants,
128  int my_participant_number,
129  size_t meta_data_size,
130  int flags,
131  SharedFileSet *fileset,
132  const char *name)
133 {
134  SharedTuplestoreAccessor *accessor;
135  int i;
136 
137  Assert(my_participant_number < participants);
138 
139  sts->nparticipants = participants;
140  sts->meta_data_size = meta_data_size;
141  sts->flags = flags;
142 
143  if (strlen(name) > sizeof(sts->name) - 1)
144  elog(ERROR, "SharedTuplestore name too long");
145  strcpy(sts->name, name);
146 
147  /*
148  * Limit meta-data so it + tuple size always fits into a single chunk.
149  * sts_puttuple() and sts_read_tuple() could be made to support scenarios
150  * where that's not the case, but it's not currently required. If so,
151  * meta-data size probably should be made variable, too.
152  */
153  if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
154  elog(ERROR, "meta-data too long");
155 
156  for (i = 0; i < participants; ++i)
157  {
160  sts->participants[i].read_page = 0;
161  sts->participants[i].npages = 0;
162  sts->participants[i].writing = false;
163  }
164 
165  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
166  accessor->participant = my_participant_number;
167  accessor->sts = sts;
168  accessor->fileset = fileset;
169  accessor->context = CurrentMemoryContext;
170 
171  return accessor;
172 }
173 
174 /*
175  * Attach to a SharedTuplestore that has been initialized by another backend,
176  * so that this backend can read and write tuples.
177  */
180  int my_participant_number,
181  SharedFileSet *fileset)
182 {
183  SharedTuplestoreAccessor *accessor;
184 
185  Assert(my_participant_number < sts->nparticipants);
186 
187  accessor = palloc0(sizeof(SharedTuplestoreAccessor));
188  accessor->participant = my_participant_number;
189  accessor->sts = sts;
190  accessor->fileset = fileset;
191  accessor->context = CurrentMemoryContext;
192 
193  return accessor;
194 }
195 
196 static void
198 {
199  size_t size;
200 
201  size = STS_CHUNK_PAGES * BLCKSZ;
202  BufFileWrite(accessor->write_file, accessor->write_chunk, size);
203  memset(accessor->write_chunk, 0, size);
204  accessor->write_pointer = &accessor->write_chunk->data[0];
205  accessor->sts->participants[accessor->participant].npages +=
207 }
208 
209 /*
210  * Finish writing tuples. This must be called by all backends that have
211  * written data before any backend begins reading it.
212  */
213 void
215 {
216  if (accessor->write_file != NULL)
217  {
218  sts_flush_chunk(accessor);
219  BufFileClose(accessor->write_file);
220  pfree(accessor->write_chunk);
221  accessor->write_chunk = NULL;
222  accessor->write_file = NULL;
223  accessor->sts->participants[accessor->participant].writing = false;
224  }
225 }
226 
227 /*
228  * Prepare to rescan. Only one participant must call this. After it returns,
229  * all participants may call sts_begin_parallel_scan() and then loop over
230  * sts_parallel_scan_next(). This function must not be called concurrently
231  * with a scan, and synchronization to avoid that is the caller's
232  * responsibility.
233  */
234 void
236 {
237  int i;
238 
239  /*
240  * Reset the shared read head for all participants' files. Also set the
241  * initial chunk size to the minimum (any increases from that size will be
242  * recorded in chunk_expansion_log).
243  */
244  for (i = 0; i < accessor->sts->nparticipants; ++i)
245  {
246  accessor->sts->participants[i].read_page = 0;
247  }
248 }
249 
250 /*
251  * Begin scanning the contents in parallel.
252  */
253 void
255 {
257 
258  /* End any existing scan that was in progress. */
259  sts_end_parallel_scan(accessor);
260 
261  /*
262  * Any backend that might have written into this shared tuplestore must
263  * have called sts_end_write(), so that all buffers are flushed and the
264  * files have stopped growing.
265  */
266  for (i = 0; i < accessor->sts->nparticipants; ++i)
267  Assert(!accessor->sts->participants[i].writing);
268 
269  /*
270  * We will start out reading the file that THIS backend wrote. There may
271  * be some caching locality advantage to that.
272  */
273  accessor->read_participant = accessor->participant;
274  accessor->read_file = NULL;
275  accessor->read_next_page = 0;
276 }
277 
278 /*
279  * Finish a parallel scan, freeing associated backend-local resources.
280  */
281 void
283 {
284  /*
285  * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
286  * we'd probably need a reference count of current parallel scanners so we
287  * could safely do it only when the reference count reaches zero.
288  */
289  if (accessor->read_file != NULL)
290  {
291  BufFileClose(accessor->read_file);
292  accessor->read_file = NULL;
293  }
294 }
295 
296 /*
297  * Write a tuple. If a meta-data size was provided to sts_initialize, then a
298  * pointer to meta data of that size must be provided.
299  */
300 void
301 sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
302  MinimalTuple tuple)
303 {
304  size_t size;
305 
306  /* Do we have our own file yet? */
307  if (accessor->write_file == NULL)
308  {
309  SharedTuplestoreParticipant *participant;
310  char name[MAXPGPATH];
311 
312  /* Create one. Only this backend will write into it. */
313  sts_filename(name, accessor, accessor->participant);
314  accessor->write_file =
315  BufFileCreateFileSet(&accessor->fileset->fs, name);
316 
317  /* Set up the shared state for this backend's file. */
318  participant = &accessor->sts->participants[accessor->participant];
319  participant->writing = true; /* for assertions only */
320  }
321 
322  /* Do we have space? */
323  size = accessor->sts->meta_data_size + tuple->t_len;
324  if (accessor->write_pointer + size > accessor->write_end)
325  {
326  if (accessor->write_chunk == NULL)
327  {
328  /* First time through. Allocate chunk. */
329  accessor->write_chunk = (SharedTuplestoreChunk *)
331  STS_CHUNK_PAGES * BLCKSZ);
332  accessor->write_chunk->ntuples = 0;
333  accessor->write_pointer = &accessor->write_chunk->data[0];
334  accessor->write_end = (char *)
335  accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
336  }
337  else
338  {
339  /* See if flushing helps. */
340  sts_flush_chunk(accessor);
341  }
342 
343  /* It may still not be enough in the case of a gigantic tuple. */
344  if (accessor->write_pointer + size > accessor->write_end)
345  {
346  size_t written;
347 
348  /*
349  * We'll write the beginning of the oversized tuple, and then
350  * write the rest in some number of 'overflow' chunks.
351  *
352  * sts_initialize() verifies that the size of the tuple +
353  * meta-data always fits into a chunk. Because the chunk has been
354  * flushed above, we can be sure to have all of a chunk's usable
355  * space available.
356  */
357  Assert(accessor->write_pointer + accessor->sts->meta_data_size +
358  sizeof(uint32) < accessor->write_end);
359 
360  /* Write the meta-data as one chunk. */
361  if (accessor->sts->meta_data_size > 0)
362  memcpy(accessor->write_pointer, meta_data,
363  accessor->sts->meta_data_size);
364 
365  /*
366  * Write as much of the tuple as we can fit. This includes the
367  * tuple's size at the start.
368  */
369  written = accessor->write_end - accessor->write_pointer -
370  accessor->sts->meta_data_size;
371  memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
372  tuple, written);
373  ++accessor->write_chunk->ntuples;
374  size -= accessor->sts->meta_data_size;
375  size -= written;
376  /* Now write as many overflow chunks as we need for the rest. */
377  while (size > 0)
378  {
379  size_t written_this_chunk;
380 
381  sts_flush_chunk(accessor);
382 
383  /*
384  * How many overflow chunks to go? This will allow readers to
385  * skip all of them at once instead of reading each one.
386  */
387  accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
389  written_this_chunk =
390  Min(accessor->write_end - accessor->write_pointer, size);
391  memcpy(accessor->write_pointer, (char *) tuple + written,
392  written_this_chunk);
393  accessor->write_pointer += written_this_chunk;
394  size -= written_this_chunk;
395  written += written_this_chunk;
396  }
397  return;
398  }
399  }
400 
401  /* Copy meta-data and tuple into buffer. */
402  if (accessor->sts->meta_data_size > 0)
403  memcpy(accessor->write_pointer, meta_data,
404  accessor->sts->meta_data_size);
405  memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
406  tuple->t_len);
407  accessor->write_pointer += size;
408  ++accessor->write_chunk->ntuples;
409 }
410 
411 static MinimalTuple
412 sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
413 {
414  MinimalTuple tuple;
415  uint32 size;
416  size_t remaining_size;
417  size_t this_chunk_size;
418  char *destination;
419 
420  /*
421  * We'll keep track of bytes read from this chunk so that we can detect an
422  * overflowing tuple and switch to reading overflow pages.
423  */
424  if (accessor->sts->meta_data_size > 0)
425  {
426  BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
427  accessor->read_bytes += accessor->sts->meta_data_size;
428  }
429  BufFileReadExact(accessor->read_file, &size, sizeof(size));
430  accessor->read_bytes += sizeof(size);
431  if (size > accessor->read_buffer_size)
432  {
433  size_t new_read_buffer_size;
434 
435  if (accessor->read_buffer != NULL)
436  pfree(accessor->read_buffer);
437  new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
438  accessor->read_buffer =
439  MemoryContextAlloc(accessor->context, new_read_buffer_size);
440  accessor->read_buffer_size = new_read_buffer_size;
441  }
442  remaining_size = size - sizeof(uint32);
443  this_chunk_size = Min(remaining_size,
444  BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
445  destination = accessor->read_buffer + sizeof(uint32);
446  BufFileReadExact(accessor->read_file, destination, this_chunk_size);
447  accessor->read_bytes += this_chunk_size;
448  remaining_size -= this_chunk_size;
449  destination += this_chunk_size;
450  ++accessor->read_ntuples;
451 
452  /* Check if we need to read any overflow chunks. */
453  while (remaining_size > 0)
454  {
455  /* We are now positioned at the start of an overflow chunk. */
456  SharedTuplestoreChunk chunk_header;
457 
458  BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
459  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
460  if (chunk_header.overflow == 0)
461  ereport(ERROR,
463  errmsg("unexpected chunk in shared tuplestore temporary file"),
464  errdetail_internal("Expected overflow chunk.")));
465  accessor->read_next_page += STS_CHUNK_PAGES;
466  this_chunk_size = Min(remaining_size,
467  BLCKSZ * STS_CHUNK_PAGES -
469  BufFileReadExact(accessor->read_file, destination, this_chunk_size);
470  accessor->read_bytes += this_chunk_size;
471  remaining_size -= this_chunk_size;
472  destination += this_chunk_size;
473 
474  /*
475  * These will be used to count regular tuples following the oversized
476  * tuple that spilled into this overflow chunk.
477  */
478  accessor->read_ntuples = 0;
479  accessor->read_ntuples_available = chunk_header.ntuples;
480  }
481 
482  tuple = (MinimalTuple) accessor->read_buffer;
483  tuple->t_len = size;
484 
485  return tuple;
486 }
487 
488 /*
489  * Get the next tuple in the current parallel scan.
490  */
493 {
495  BlockNumber read_page;
496  bool eof;
497 
498  for (;;)
499  {
500  /* Can we read more tuples from the current chunk? */
501  if (accessor->read_ntuples < accessor->read_ntuples_available)
502  return sts_read_tuple(accessor, meta_data);
503 
504  /* Find the location of a new chunk to read. */
505  p = &accessor->sts->participants[accessor->read_participant];
506 
508  /* We can skip directly past overflow pages we know about. */
509  if (p->read_page < accessor->read_next_page)
510  p->read_page = accessor->read_next_page;
511  eof = p->read_page >= p->npages;
512  if (!eof)
513  {
514  /* Claim the next chunk. */
515  read_page = p->read_page;
516  /* Advance the read head for the next reader. */
518  accessor->read_next_page = p->read_page;
519  }
520  LWLockRelease(&p->lock);
521 
522  if (!eof)
523  {
524  SharedTuplestoreChunk chunk_header;
525 
526  /* Make sure we have the file open. */
527  if (accessor->read_file == NULL)
528  {
529  char name[MAXPGPATH];
530 
531  sts_filename(name, accessor, accessor->read_participant);
532  accessor->read_file =
533  BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
534  false);
535  }
536 
537  /* Seek and load the chunk header. */
538  if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
539  ereport(ERROR,
541  errmsg("could not seek to block %u in shared tuplestore temporary file",
542  read_page)));
543  BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
544 
545  /*
546  * If this is an overflow chunk, we skip it and any following
547  * overflow chunks all at once.
548  */
549  if (chunk_header.overflow > 0)
550  {
551  accessor->read_next_page = read_page +
552  chunk_header.overflow * STS_CHUNK_PAGES;
553  continue;
554  }
555 
556  accessor->read_ntuples = 0;
557  accessor->read_ntuples_available = chunk_header.ntuples;
558  accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
559 
560  /* Go around again, so we can get a tuple from this chunk. */
561  }
562  else
563  {
564  if (accessor->read_file != NULL)
565  {
566  BufFileClose(accessor->read_file);
567  accessor->read_file = NULL;
568  }
569 
570  /*
571  * Try the next participant's file. If we've gone full circle,
572  * we're done.
573  */
574  accessor->read_participant = (accessor->read_participant + 1) %
575  accessor->sts->nparticipants;
576  if (accessor->read_participant == accessor->participant)
577  break;
578  accessor->read_next_page = 0;
579 
580  /* Go around again, so we can get a chunk from this file. */
581  }
582  }
583 
584  return NULL;
585 }
586 
587 /*
588  * Create the name used for the BufFile that a given participant will write.
589  */
590 static void
591 sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
592 {
593  snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
594 }
uint32 BlockNumber
Definition: block.h:31
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:651
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
int BufFileSeekBlock(BufFile *file, long blknum)
Definition: buffile.c:848
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:673
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262
void BufFileClose(BufFile *file)
Definition: buffile.c:407
unsigned int uint32
Definition: c.h:490
#define Min(x, y)
Definition: c.h:988
#define Max(x, y)
Definition: c.h:982
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:382
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:166
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
int errcode_for_file_access(void)
Definition: elog.c:881
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
const char * name
Definition: encode.c:571
MinimalTupleData * MinimalTuple
Definition: htup.h:27
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_SHARED_TUPLESTORE
Definition: lwlock.h:200
@ LW_EXCLUSIVE
Definition: lwlock.h:115
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1048
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1005
#define NAMEDATALEN
#define MAXPGPATH
#define snprintf
Definition: port.h:238
void sts_reinitialize(SharedTuplestoreAccessor *accessor)
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
#define STS_CHUNK_HEADER_SIZE
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
struct SharedTuplestoreChunk SharedTuplestoreChunk
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
#define STS_CHUNK_PAGES
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
size_t sts_estimate(int participants)
#define STS_CHUNK_DATA_SIZE
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
Definition: lwlock.h:40
SharedTuplestore * sts
SharedTuplestoreChunk * write_chunk
char data[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
char name[NAMEDATALEN]