PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
sharedtuplestore.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * sharedtuplestore.c
4 * Simple mechanism for sharing tuples between backends.
5 *
6 * This module contains a shared temporary tuple storage mechanism providing
7 * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8 * can write to a SharedTuplestore, and then multiple backends can later scan
9 * the stored tuples. Currently, the only scan type supported is a parallel
10 * scan where each backend reads an arbitrary subset of the tuples that were
11 * written.
12 *
13 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
14 * Portions Copyright (c) 1994, Regents of the University of California
15 *
16 * IDENTIFICATION
17 * src/backend/utils/sort/sharedtuplestore.c
18 *
19 *-------------------------------------------------------------------------
20 */
21
22#include "postgres.h"
23
24#include "access/htup.h"
25#include "access/htup_details.h"
26#include "storage/buffile.h"
27#include "storage/lwlock.h"
30
31/*
32 * The size of chunks, in pages. This is somewhat arbitrarily set to match
33 * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
34 * at approximately the same rate as it allocates new chunks of memory to
35 * insert them into.
36 */
37#define STS_CHUNK_PAGES 4
38#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
40
41/* Chunk written to disk. */
43{
44 int ntuples; /* Number of tuples in this chunk. */
45 int overflow; /* If overflow, how many including this one? */
48
49/* Per-participant shared state. */
51{
53 BlockNumber read_page; /* Page number for next read. */
54 BlockNumber npages; /* Number of pages written. */
55 bool writing; /* Used only for assertions. */
57
58/* The control object that lives in shared memory. */
60{
61 int nparticipants; /* Number of participants that can write. */
62 int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
63 size_t meta_data_size; /* Size of per-tuple header. */
64 char name[NAMEDATALEN]; /* A name for this tuplestore. */
65
66 /* Followed by per-participant shared state. */
68};
69
70/* Per-participant state that lives in backend-local memory. */
72{
73 int participant; /* My participant number. */
74 SharedTuplestore *sts; /* The shared state. */
75 SharedFileSet *fileset; /* The SharedFileSet holding files. */
76 MemoryContext context; /* Memory context for buffers. */
77
78 /* State for reading. */
79 int read_participant; /* The current participant to read from. */
80 BufFile *read_file; /* The current file to read from. */
81 int read_ntuples_available; /* The number of tuples in chunk. */
82 int read_ntuples; /* How many tuples have we read from chunk? */
83 size_t read_bytes; /* How many bytes have we read from chunk? */
84 char *read_buffer; /* A buffer for loading tuples. */
86 BlockNumber read_next_page; /* Lowest block we'll consider reading. */
87
88 /* State for writing. */
89 SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
90 BufFile *write_file; /* The current file to write to. */
91 BlockNumber write_page; /* The next page to write to. */
92 char *write_pointer; /* Current write pointer within chunk. */
93 char *write_end; /* One past the end of the current chunk. */
94};
95
96static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
97 int participant);
98
99/*
100 * Return the amount of shared memory required to hold SharedTuplestore for a
101 * given number of participants.
102 */
103size_t
104sts_estimate(int participants)
105{
106 return offsetof(SharedTuplestore, participants) +
107 sizeof(SharedTuplestoreParticipant) * participants;
108}
109
110/*
111 * Initialize a SharedTuplestore in existing shared memory. There must be
112 * space for sts_estimate(participants) bytes. If flags includes the value
113 * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
114 * eagerly (but this isn't yet implemented).
115 *
116 * Tuples that are stored may optionally carry a piece of fixed sized
117 * meta-data which will be retrieved along with the tuple. This is useful for
118 * the hash values used in multi-batch hash joins, but could have other
119 * applications.
120 *
121 * The caller must supply a SharedFileSet, which is essentially a directory
122 * that will be cleaned up automatically, and a name which must be unique
123 * across all SharedTuplestores created in the same SharedFileSet.
124 */
126sts_initialize(SharedTuplestore *sts, int participants,
127 int my_participant_number,
128 size_t meta_data_size,
129 int flags,
130 SharedFileSet *fileset,
131 const char *name)
132{
133 SharedTuplestoreAccessor *accessor;
134 int i;
135
136 Assert(my_participant_number < participants);
137
138 sts->nparticipants = participants;
139 sts->meta_data_size = meta_data_size;
140 sts->flags = flags;
141
142 if (strlen(name) > sizeof(sts->name) - 1)
143 elog(ERROR, "SharedTuplestore name too long");
144 strcpy(sts->name, name);
145
146 /*
147 * Limit meta-data so it + tuple size always fits into a single chunk.
148 * sts_puttuple() and sts_read_tuple() could be made to support scenarios
149 * where that's not the case, but it's not currently required. If so,
150 * meta-data size probably should be made variable, too.
151 */
152 if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
153 elog(ERROR, "meta-data too long");
154
155 for (i = 0; i < participants; ++i)
156 {
159 sts->participants[i].read_page = 0;
160 sts->participants[i].npages = 0;
161 sts->participants[i].writing = false;
162 }
163
164 accessor = palloc0(sizeof(SharedTuplestoreAccessor));
165 accessor->participant = my_participant_number;
166 accessor->sts = sts;
167 accessor->fileset = fileset;
168 accessor->context = CurrentMemoryContext;
169
170 return accessor;
171}
172
173/*
174 * Attach to a SharedTuplestore that has been initialized by another backend,
175 * so that this backend can read and write tuples.
176 */
179 int my_participant_number,
180 SharedFileSet *fileset)
181{
182 SharedTuplestoreAccessor *accessor;
183
184 Assert(my_participant_number < sts->nparticipants);
185
186 accessor = palloc0(sizeof(SharedTuplestoreAccessor));
187 accessor->participant = my_participant_number;
188 accessor->sts = sts;
189 accessor->fileset = fileset;
190 accessor->context = CurrentMemoryContext;
191
192 return accessor;
193}
194
195static void
197{
198 size_t size;
199
200 size = STS_CHUNK_PAGES * BLCKSZ;
201 BufFileWrite(accessor->write_file, accessor->write_chunk, size);
202 memset(accessor->write_chunk, 0, size);
203 accessor->write_pointer = &accessor->write_chunk->data[0];
204 accessor->sts->participants[accessor->participant].npages +=
206}
207
208/*
209 * Finish writing tuples. This must be called by all backends that have
210 * written data before any backend begins reading it.
211 */
212void
214{
215 if (accessor->write_file != NULL)
216 {
217 sts_flush_chunk(accessor);
218 BufFileClose(accessor->write_file);
219 pfree(accessor->write_chunk);
220 accessor->write_chunk = NULL;
221 accessor->write_file = NULL;
222 accessor->sts->participants[accessor->participant].writing = false;
223 }
224}
225
226/*
227 * Prepare to rescan. Only one participant must call this. After it returns,
228 * all participants may call sts_begin_parallel_scan() and then loop over
229 * sts_parallel_scan_next(). This function must not be called concurrently
230 * with a scan, and synchronization to avoid that is the caller's
231 * responsibility.
232 */
233void
235{
236 int i;
237
238 /*
239 * Reset the shared read head for all participants' files. Also set the
240 * initial chunk size to the minimum (any increases from that size will be
241 * recorded in chunk_expansion_log).
242 */
243 for (i = 0; i < accessor->sts->nparticipants; ++i)
244 {
245 accessor->sts->participants[i].read_page = 0;
246 }
247}
248
249/*
250 * Begin scanning the contents in parallel.
251 */
252void
254{
256
257 /* End any existing scan that was in progress. */
258 sts_end_parallel_scan(accessor);
259
260 /*
261 * Any backend that might have written into this shared tuplestore must
262 * have called sts_end_write(), so that all buffers are flushed and the
263 * files have stopped growing.
264 */
265 for (i = 0; i < accessor->sts->nparticipants; ++i)
266 Assert(!accessor->sts->participants[i].writing);
267
268 /*
269 * We will start out reading the file that THIS backend wrote. There may
270 * be some caching locality advantage to that.
271 */
272 accessor->read_participant = accessor->participant;
273 accessor->read_file = NULL;
274 accessor->read_next_page = 0;
275}
276
277/*
278 * Finish a parallel scan, freeing associated backend-local resources.
279 */
280void
282{
283 /*
284 * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
285 * we'd probably need a reference count of current parallel scanners so we
286 * could safely do it only when the reference count reaches zero.
287 */
288 if (accessor->read_file != NULL)
289 {
290 BufFileClose(accessor->read_file);
291 accessor->read_file = NULL;
292 }
293}
294
295/*
296 * Write a tuple. If a meta-data size was provided to sts_initialize, then a
297 * pointer to meta data of that size must be provided.
298 */
299void
300sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
301 MinimalTuple tuple)
302{
303 size_t size;
304
305 /* Do we have our own file yet? */
306 if (accessor->write_file == NULL)
307 {
308 SharedTuplestoreParticipant *participant;
309 char name[MAXPGPATH];
310 MemoryContext oldcxt;
311
312 /* Create one. Only this backend will write into it. */
313 sts_filename(name, accessor, accessor->participant);
314
315 oldcxt = MemoryContextSwitchTo(accessor->context);
316 accessor->write_file =
317 BufFileCreateFileSet(&accessor->fileset->fs, name);
318 MemoryContextSwitchTo(oldcxt);
319
320 /* Set up the shared state for this backend's file. */
321 participant = &accessor->sts->participants[accessor->participant];
322 participant->writing = true; /* for assertions only */
323 }
324
325 /* Do we have space? */
326 size = accessor->sts->meta_data_size + tuple->t_len;
327 if (accessor->write_pointer + size > accessor->write_end)
328 {
329 if (accessor->write_chunk == NULL)
330 {
331 /* First time through. Allocate chunk. */
332 accessor->write_chunk = (SharedTuplestoreChunk *)
334 STS_CHUNK_PAGES * BLCKSZ);
335 accessor->write_chunk->ntuples = 0;
336 accessor->write_pointer = &accessor->write_chunk->data[0];
337 accessor->write_end = (char *)
338 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
339 }
340 else
341 {
342 /* See if flushing helps. */
343 sts_flush_chunk(accessor);
344 }
345
346 /* It may still not be enough in the case of a gigantic tuple. */
347 if (accessor->write_pointer + size > accessor->write_end)
348 {
349 size_t written;
350
351 /*
352 * We'll write the beginning of the oversized tuple, and then
353 * write the rest in some number of 'overflow' chunks.
354 *
355 * sts_initialize() verifies that the size of the tuple +
356 * meta-data always fits into a chunk. Because the chunk has been
357 * flushed above, we can be sure to have all of a chunk's usable
358 * space available.
359 */
360 Assert(accessor->write_pointer + accessor->sts->meta_data_size +
361 sizeof(uint32) < accessor->write_end);
362
363 /* Write the meta-data as one chunk. */
364 if (accessor->sts->meta_data_size > 0)
365 memcpy(accessor->write_pointer, meta_data,
366 accessor->sts->meta_data_size);
367
368 /*
369 * Write as much of the tuple as we can fit. This includes the
370 * tuple's size at the start.
371 */
372 written = accessor->write_end - accessor->write_pointer -
373 accessor->sts->meta_data_size;
374 memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
375 tuple, written);
376 ++accessor->write_chunk->ntuples;
377 size -= accessor->sts->meta_data_size;
378 size -= written;
379 /* Now write as many overflow chunks as we need for the rest. */
380 while (size > 0)
381 {
382 size_t written_this_chunk;
383
384 sts_flush_chunk(accessor);
385
386 /*
387 * How many overflow chunks to go? This will allow readers to
388 * skip all of them at once instead of reading each one.
389 */
390 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
392 written_this_chunk =
393 Min(accessor->write_end - accessor->write_pointer, size);
394 memcpy(accessor->write_pointer, (char *) tuple + written,
395 written_this_chunk);
396 accessor->write_pointer += written_this_chunk;
397 size -= written_this_chunk;
398 written += written_this_chunk;
399 }
400 return;
401 }
402 }
403
404 /* Copy meta-data and tuple into buffer. */
405 if (accessor->sts->meta_data_size > 0)
406 memcpy(accessor->write_pointer, meta_data,
407 accessor->sts->meta_data_size);
408 memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
409 tuple->t_len);
410 accessor->write_pointer += size;
411 ++accessor->write_chunk->ntuples;
412}
413
414static MinimalTuple
415sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
416{
417 MinimalTuple tuple;
418 uint32 size;
419 size_t remaining_size;
420 size_t this_chunk_size;
421 char *destination;
422
423 /*
424 * We'll keep track of bytes read from this chunk so that we can detect an
425 * overflowing tuple and switch to reading overflow pages.
426 */
427 if (accessor->sts->meta_data_size > 0)
428 {
429 BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
430 accessor->read_bytes += accessor->sts->meta_data_size;
431 }
432 BufFileReadExact(accessor->read_file, &size, sizeof(size));
433 accessor->read_bytes += sizeof(size);
434 if (size > accessor->read_buffer_size)
435 {
436 size_t new_read_buffer_size;
437
438 if (accessor->read_buffer != NULL)
439 pfree(accessor->read_buffer);
440 new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
441 accessor->read_buffer =
442 MemoryContextAlloc(accessor->context, new_read_buffer_size);
443 accessor->read_buffer_size = new_read_buffer_size;
444 }
445 remaining_size = size - sizeof(uint32);
446 this_chunk_size = Min(remaining_size,
447 BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
448 destination = accessor->read_buffer + sizeof(uint32);
449 BufFileReadExact(accessor->read_file, destination, this_chunk_size);
450 accessor->read_bytes += this_chunk_size;
451 remaining_size -= this_chunk_size;
452 destination += this_chunk_size;
453 ++accessor->read_ntuples;
454
455 /* Check if we need to read any overflow chunks. */
456 while (remaining_size > 0)
457 {
458 /* We are now positioned at the start of an overflow chunk. */
459 SharedTuplestoreChunk chunk_header;
460
461 BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
463 if (chunk_header.overflow == 0)
466 errmsg("unexpected chunk in shared tuplestore temporary file"),
467 errdetail_internal("Expected overflow chunk.")));
468 accessor->read_next_page += STS_CHUNK_PAGES;
469 this_chunk_size = Min(remaining_size,
470 BLCKSZ * STS_CHUNK_PAGES -
472 BufFileReadExact(accessor->read_file, destination, this_chunk_size);
473 accessor->read_bytes += this_chunk_size;
474 remaining_size -= this_chunk_size;
475 destination += this_chunk_size;
476
477 /*
478 * These will be used to count regular tuples following the oversized
479 * tuple that spilled into this overflow chunk.
480 */
481 accessor->read_ntuples = 0;
482 accessor->read_ntuples_available = chunk_header.ntuples;
483 }
484
485 tuple = (MinimalTuple) accessor->read_buffer;
486 tuple->t_len = size;
487
488 return tuple;
489}
490
491/*
492 * Get the next tuple in the current parallel scan.
493 */
496{
498 BlockNumber read_page;
499 bool eof;
500
501 for (;;)
502 {
503 /* Can we read more tuples from the current chunk? */
504 if (accessor->read_ntuples < accessor->read_ntuples_available)
505 return sts_read_tuple(accessor, meta_data);
506
507 /* Find the location of a new chunk to read. */
508 p = &accessor->sts->participants[accessor->read_participant];
509
511 /* We can skip directly past overflow pages we know about. */
512 if (p->read_page < accessor->read_next_page)
513 p->read_page = accessor->read_next_page;
514 eof = p->read_page >= p->npages;
515 if (!eof)
516 {
517 /* Claim the next chunk. */
518 read_page = p->read_page;
519 /* Advance the read head for the next reader. */
521 accessor->read_next_page = p->read_page;
522 }
523 LWLockRelease(&p->lock);
524
525 if (!eof)
526 {
527 SharedTuplestoreChunk chunk_header;
528
529 /* Make sure we have the file open. */
530 if (accessor->read_file == NULL)
531 {
532 char name[MAXPGPATH];
533 MemoryContext oldcxt;
534
535 sts_filename(name, accessor, accessor->read_participant);
536
537 oldcxt = MemoryContextSwitchTo(accessor->context);
538 accessor->read_file =
539 BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
540 false);
541 MemoryContextSwitchTo(oldcxt);
542 }
543
544 /* Seek and load the chunk header. */
545 if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
548 errmsg("could not seek to block %u in shared tuplestore temporary file",
549 read_page)));
550 BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
551
552 /*
553 * If this is an overflow chunk, we skip it and any following
554 * overflow chunks all at once.
555 */
556 if (chunk_header.overflow > 0)
557 {
558 accessor->read_next_page = read_page +
559 chunk_header.overflow * STS_CHUNK_PAGES;
560 continue;
561 }
562
563 accessor->read_ntuples = 0;
564 accessor->read_ntuples_available = chunk_header.ntuples;
566
567 /* Go around again, so we can get a tuple from this chunk. */
568 }
569 else
570 {
571 if (accessor->read_file != NULL)
572 {
573 BufFileClose(accessor->read_file);
574 accessor->read_file = NULL;
575 }
576
577 /*
578 * Try the next participant's file. If we've gone full circle,
579 * we're done.
580 */
581 accessor->read_participant = (accessor->read_participant + 1) %
582 accessor->sts->nparticipants;
583 if (accessor->read_participant == accessor->participant)
584 break;
585 accessor->read_next_page = 0;
586
587 /* Go around again, so we can get a chunk from this file. */
588 }
589 }
590
591 return NULL;
592}
593
594/*
595 * Create the name used for the BufFile that a given participant will write.
596 */
597static void
598sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
599{
600 snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
601}
uint32 BlockNumber
Definition: block.h:31
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
int BufFileSeekBlock(BufFile *file, int64 blknum)
Definition: buffile.c:851
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
void BufFileClose(BufFile *file)
Definition: buffile.c:412
#define Min(x, y)
Definition: c.h:958
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:201
#define Max(x, y)
Definition: c.h:952
#define Assert(condition)
Definition: c.h:812
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:417
uint32_t uint32
Definition: c.h:485
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
MinimalTupleData * MinimalTuple
Definition: htup.h:27
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:707
@ LWTRANCHE_SHARED_TUPLESTORE
Definition: lwlock.h:199
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define NAMEDATALEN
#define MAXPGPATH
#define snprintf
Definition: port.h:238
void sts_reinitialize(SharedTuplestoreAccessor *accessor)
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
#define STS_CHUNK_HEADER_SIZE
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
struct SharedTuplestoreChunk SharedTuplestoreChunk
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
#define STS_CHUNK_PAGES
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
size_t sts_estimate(int participants)
#define STS_CHUNK_DATA_SIZE
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
static pg_noinline void Size size
Definition: slab.c:607
Definition: lwlock.h:42
SharedTuplestore * sts
SharedTuplestoreChunk * write_chunk
char data[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]
char name[NAMEDATALEN]
const char * name