PostgreSQL Source Code git master
sharedtuplestore.c File Reference
#include "postgres.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "storage/buffile.h"
#include "storage/lwlock.h"
#include "storage/sharedfileset.h"
#include "utils/sharedtuplestore.h"
Include dependency graph for sharedtuplestore.c:

Go to the source code of this file.

Data Structures

struct  SharedTuplestoreChunk
 
struct  SharedTuplestoreParticipant
 
struct  SharedTuplestore
 
struct  SharedTuplestoreAccessor
 

Macros

#define STS_CHUNK_PAGES   4
 
#define STS_CHUNK_HEADER_SIZE   offsetof(SharedTuplestoreChunk, data)
 
#define STS_CHUNK_DATA_SIZE   (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
 

Typedefs

typedef struct SharedTuplestoreChunk SharedTuplestoreChunk
 
typedef struct SharedTuplestoreParticipant SharedTuplestoreParticipant
 

Functions

static void sts_filename (char *name, SharedTuplestoreAccessor *accessor, int participant)
 
size_t sts_estimate (int participants)
 
SharedTuplestoreAccessorsts_initialize (SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
 
SharedTuplestoreAccessorsts_attach (SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
 
static void sts_flush_chunk (SharedTuplestoreAccessor *accessor)
 
void sts_end_write (SharedTuplestoreAccessor *accessor)
 
void sts_reinitialize (SharedTuplestoreAccessor *accessor)
 
void sts_begin_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_end_parallel_scan (SharedTuplestoreAccessor *accessor)
 
void sts_puttuple (SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
 
static MinimalTuple sts_read_tuple (SharedTuplestoreAccessor *accessor, void *meta_data)
 
MinimalTuple sts_parallel_scan_next (SharedTuplestoreAccessor *accessor, void *meta_data)
 

Macro Definition Documentation

◆ STS_CHUNK_DATA_SIZE

#define STS_CHUNK_DATA_SIZE   (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)

Definition at line 39 of file sharedtuplestore.c.

◆ STS_CHUNK_HEADER_SIZE

#define STS_CHUNK_HEADER_SIZE   offsetof(SharedTuplestoreChunk, data)

Definition at line 38 of file sharedtuplestore.c.

◆ STS_CHUNK_PAGES

#define STS_CHUNK_PAGES   4

Definition at line 37 of file sharedtuplestore.c.

Typedef Documentation

◆ SharedTuplestoreChunk

◆ SharedTuplestoreParticipant

Function Documentation

◆ sts_attach()

SharedTuplestoreAccessor * sts_attach ( SharedTuplestore sts,
int  my_participant_number,
SharedFileSet fileset 
)

Definition at line 177 of file sharedtuplestore.c.

180{
181 SharedTuplestoreAccessor *accessor;
182
183 Assert(my_participant_number < sts->nparticipants);
184
185 accessor = palloc0(sizeof(SharedTuplestoreAccessor));
186 accessor->participant = my_participant_number;
187 accessor->sts = sts;
188 accessor->fileset = fileset;
189 accessor->context = CurrentMemoryContext;
190
191 return accessor;
192}
Assert(PointerIsAligned(start, uint64))
void * palloc0(Size size)
Definition: mcxt.c:1395
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
SharedTuplestore * sts

References Assert(), SharedTuplestoreAccessor::context, CurrentMemoryContext, SharedTuplestoreAccessor::fileset, palloc0(), SharedTuplestoreAccessor::participant, and SharedTuplestoreAccessor::sts.

Referenced by ExecParallelHashEnsureBatchAccessors(), and ExecParallelHashRepartitionRest().

◆ sts_begin_parallel_scan()

void sts_begin_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 252 of file sharedtuplestore.c.

253{
255
256 /* End any existing scan that was in progress. */
257 sts_end_parallel_scan(accessor);
258
259 /*
260 * Any backend that might have written into this shared tuplestore must
261 * have called sts_end_write(), so that all buffers are flushed and the
262 * files have stopped growing.
263 */
264 for (i = 0; i < accessor->sts->nparticipants; ++i)
265 Assert(!accessor->sts->participants[i].writing);
266
267 /*
268 * We will start out reading the file that THIS backend wrote. There may
269 * be some caching locality advantage to that.
270 */
271 accessor->read_participant = accessor->participant;
272 accessor->read_file = NULL;
273 accessor->read_next_page = 0;
274}
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:228
int i
Definition: isn.c:77
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]

References Assert(), i, SharedTuplestore::nparticipants, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, PG_USED_FOR_ASSERTS_ONLY, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, sts_end_parallel_scan(), and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinNewBatch(), and ExecParallelHashRepartitionRest().

◆ sts_end_parallel_scan()

void sts_end_parallel_scan ( SharedTuplestoreAccessor accessor)

Definition at line 280 of file sharedtuplestore.c.

281{
282 /*
283 * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
284 * we'd probably need a reference count of current parallel scanners so we
285 * could safely do it only when the reference count reaches zero.
286 */
287 if (accessor->read_file != NULL)
288 {
289 BufFileClose(accessor->read_file);
290 accessor->read_file = NULL;
291 }
292}
void BufFileClose(BufFile *file)
Definition: buffile.c:412

References BufFileClose(), and SharedTuplestoreAccessor::read_file.

Referenced by ExecHashTableDetach(), ExecHashTableDetachBatch(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinNewBatch(), ExecParallelHashRepartitionRest(), ExecParallelPrepHashTableForUnmatched(), and sts_begin_parallel_scan().

◆ sts_end_write()

void sts_end_write ( SharedTuplestoreAccessor accessor)

Definition at line 212 of file sharedtuplestore.c.

213{
214 if (accessor->write_file != NULL)
215 {
216 sts_flush_chunk(accessor);
217 BufFileClose(accessor->write_file);
218 pfree(accessor->write_chunk);
219 accessor->write_chunk = NULL;
220 accessor->write_file = NULL;
221 accessor->sts->participants[accessor->participant].writing = false;
222 }
223}
void pfree(void *pointer)
Definition: mcxt.c:1594
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreChunk * write_chunk

References BufFileClose(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, pfree(), SharedTuplestoreAccessor::sts, sts_flush_chunk(), SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreParticipant::writing.

Referenced by ExecHashTableDetach(), ExecParallelHashCloseBatchAccessors(), ExecParallelHashJoinPartitionOuter(), and MultiExecParallelHash().

◆ sts_estimate()

size_t sts_estimate ( int  participants)

Definition at line 103 of file sharedtuplestore.c.

104{
105 return offsetof(SharedTuplestore, participants) +
106 sizeof(SharedTuplestoreParticipant) * participants;
107}
struct SharedTuplestoreParticipant SharedTuplestoreParticipant

◆ sts_filename()

static void sts_filename ( char *  name,
SharedTuplestoreAccessor accessor,
int  participant 
)
static

Definition at line 597 of file sharedtuplestore.c.

598{
599 snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
600}
#define MAXPGPATH
#define snprintf
Definition: port.h:260
char name[NAMEDATALEN]
const char * name

References MAXPGPATH, name, SharedTuplestore::name, snprintf, and SharedTuplestoreAccessor::sts.

Referenced by sts_parallel_scan_next(), and sts_puttuple().

◆ sts_flush_chunk()

static void sts_flush_chunk ( SharedTuplestoreAccessor accessor)
static

Definition at line 195 of file sharedtuplestore.c.

196{
197 size_t size;
198
199 size = STS_CHUNK_PAGES * BLCKSZ;
200 BufFileWrite(accessor->write_file, accessor->write_chunk, size);
201 memset(accessor->write_chunk, 0, size);
202 accessor->write_pointer = &accessor->write_chunk->data[0];
203 accessor->sts->participants[accessor->participant].npages +=
205}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676
#define STS_CHUNK_PAGES
char data[FLEXIBLE_ARRAY_MEMBER]

References BufFileWrite(), SharedTuplestoreChunk::data, SharedTuplestoreParticipant::npages, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_PAGES, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_file, and SharedTuplestoreAccessor::write_pointer.

Referenced by sts_end_write(), and sts_puttuple().

◆ sts_initialize()

SharedTuplestoreAccessor * sts_initialize ( SharedTuplestore sts,
int  participants,
int  my_participant_number,
size_t  meta_data_size,
int  flags,
SharedFileSet fileset,
const char *  name 
)

Definition at line 125 of file sharedtuplestore.c.

131{
132 SharedTuplestoreAccessor *accessor;
133 int i;
134
135 Assert(my_participant_number < participants);
136
137 sts->nparticipants = participants;
138 sts->meta_data_size = meta_data_size;
139 sts->flags = flags;
140
141 if (strlen(name) > sizeof(sts->name) - 1)
142 elog(ERROR, "SharedTuplestore name too long");
143 strcpy(sts->name, name);
144
145 /*
146 * Limit meta-data so it + tuple size always fits into a single chunk.
147 * sts_puttuple() and sts_read_tuple() could be made to support scenarios
148 * where that's not the case, but it's not currently required. If so,
149 * meta-data size probably should be made variable, too.
150 */
151 if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
152 elog(ERROR, "meta-data too long");
153
154 for (i = 0; i < participants; ++i)
155 {
157 LWTRANCHE_SHARED_TUPLESTORE);
158 sts->participants[i].read_page = 0;
159 sts->participants[i].npages = 0;
160 sts->participants[i].writing = false;
161 }
162
163 accessor = palloc0(sizeof(SharedTuplestoreAccessor));
164 accessor->participant = my_participant_number;
165 accessor->sts = sts;
166 accessor->fileset = fileset;
167 accessor->context = CurrentMemoryContext;
168
169 return accessor;
170}
uint32_t uint32
Definition: c.h:541
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
#define STS_CHUNK_DATA_SIZE

References Assert(), SharedTuplestoreAccessor::context, CurrentMemoryContext, elog, ERROR, SharedTuplestoreAccessor::fileset, SharedTuplestore::flags, i, SharedTuplestoreParticipant::lock, LWLockInitialize(), SharedTuplestore::meta_data_size, name, SharedTuplestore::name, SharedTuplestoreParticipant::npages, SharedTuplestore::nparticipants, palloc0(), SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinSetUpBatches().

◆ sts_parallel_scan_next()

MinimalTuple sts_parallel_scan_next ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)

Definition at line 494 of file sharedtuplestore.c.

495{
497 BlockNumber read_page;
498 bool eof;
499
500 for (;;)
501 {
502 /* Can we read more tuples from the current chunk? */
503 if (accessor->read_ntuples < accessor->read_ntuples_available)
504 return sts_read_tuple(accessor, meta_data);
505
506 /* Find the location of a new chunk to read. */
507 p = &accessor->sts->participants[accessor->read_participant];
508
510 /* We can skip directly past overflow pages we know about. */
511 if (p->read_page < accessor->read_next_page)
512 p->read_page = accessor->read_next_page;
513 eof = p->read_page >= p->npages;
514 if (!eof)
515 {
516 /* Claim the next chunk. */
517 read_page = p->read_page;
518 /* Advance the read head for the next reader. */
520 accessor->read_next_page = p->read_page;
521 }
522 LWLockRelease(&p->lock);
523
524 if (!eof)
525 {
526 SharedTuplestoreChunk chunk_header;
527
528 /* Make sure we have the file open. */
529 if (accessor->read_file == NULL)
530 {
531 char name[MAXPGPATH];
532 MemoryContext oldcxt;
533
534 sts_filename(name, accessor, accessor->read_participant);
535
536 oldcxt = MemoryContextSwitchTo(accessor->context);
537 accessor->read_file =
538 BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
539 false);
540 MemoryContextSwitchTo(oldcxt);
541 }
542
543 /* Seek and load the chunk header. */
544 if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
547 errmsg("could not seek to block %u in shared tuplestore temporary file",
548 read_page)));
549 BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
550
551 /*
552 * If this is an overflow chunk, we skip it and any following
553 * overflow chunks all at once.
554 */
555 if (chunk_header.overflow > 0)
556 {
557 accessor->read_next_page = read_page +
558 chunk_header.overflow * STS_CHUNK_PAGES;
559 continue;
560 }
561
562 accessor->read_ntuples = 0;
563 accessor->read_ntuples_available = chunk_header.ntuples;
565
566 /* Go around again, so we can get a tuple from this chunk. */
567 }
568 else
569 {
570 if (accessor->read_file != NULL)
571 {
572 BufFileClose(accessor->read_file);
573 accessor->read_file = NULL;
574 }
575
576 /*
577 * Try the next participant's file. If we've gone full circle,
578 * we're done.
579 */
580 accessor->read_participant = (accessor->read_participant + 1) %
581 accessor->sts->nparticipants;
582 if (accessor->read_participant == accessor->participant)
583 break;
584 accessor->read_next_page = 0;
585
586 /* Go around again, so we can get a chunk from this file. */
587 }
588 }
589
590 return NULL;
591}
uint32 BlockNumber
Definition: block.h:31
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
int BufFileSeekBlock(BufFile *file, int64 blknum)
Definition: buffile.c:851
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
int errcode_for_file_access(void)
Definition: elog.c:886
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ereport(elevel,...)
Definition: elog.h:150
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_EXCLUSIVE
Definition: lwlock.h:112
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
#define STS_CHUNK_HEADER_SIZE
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)

References BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), BufFileSeekBlock(), SharedTuplestoreAccessor::context, ereport, errcode_for_file_access(), errmsg(), ERROR, SharedTuplestoreAccessor::fileset, SharedFileSet::fs, if(), SharedTuplestoreParticipant::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, MemoryContextSwitchTo(), name, SharedTuplestoreParticipant::npages, SharedTuplestore::nparticipants, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreParticipant::read_page, SharedTuplestoreAccessor::read_participant, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, sts_filename(), and sts_read_tuple().

Referenced by ExecParallelHashJoinNewBatch(), ExecParallelHashJoinOuterGetTuple(), and ExecParallelHashRepartitionRest().

◆ sts_puttuple()

void sts_puttuple ( SharedTuplestoreAccessor accessor,
void *  meta_data,
MinimalTuple  tuple 
)

Definition at line 299 of file sharedtuplestore.c.

301{
302 size_t size;
303
304 /* Do we have our own file yet? */
305 if (accessor->write_file == NULL)
306 {
307 SharedTuplestoreParticipant *participant;
308 char name[MAXPGPATH];
309 MemoryContext oldcxt;
310
311 /* Create one. Only this backend will write into it. */
312 sts_filename(name, accessor, accessor->participant);
313
314 oldcxt = MemoryContextSwitchTo(accessor->context);
315 accessor->write_file =
316 BufFileCreateFileSet(&accessor->fileset->fs, name);
317 MemoryContextSwitchTo(oldcxt);
318
319 /* Set up the shared state for this backend's file. */
320 participant = &accessor->sts->participants[accessor->participant];
321 participant->writing = true; /* for assertions only */
322 }
323
324 /* Do we have space? */
325 size = accessor->sts->meta_data_size + tuple->t_len;
326 if (accessor->write_pointer + size > accessor->write_end)
327 {
328 if (accessor->write_chunk == NULL)
329 {
330 /* First time through. Allocate chunk. */
331 accessor->write_chunk = (SharedTuplestoreChunk *)
333 STS_CHUNK_PAGES * BLCKSZ);
334 accessor->write_chunk->ntuples = 0;
335 accessor->write_pointer = &accessor->write_chunk->data[0];
336 accessor->write_end = (char *)
337 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
338 }
339 else
340 {
341 /* See if flushing helps. */
342 sts_flush_chunk(accessor);
343 }
344
345 /* It may still not be enough in the case of a gigantic tuple. */
346 if (accessor->write_pointer + size > accessor->write_end)
347 {
348 size_t written;
349
350 /*
351 * We'll write the beginning of the oversized tuple, and then
352 * write the rest in some number of 'overflow' chunks.
353 *
354 * sts_initialize() verifies that the size of the tuple +
355 * meta-data always fits into a chunk. Because the chunk has been
356 * flushed above, we can be sure to have all of a chunk's usable
357 * space available.
358 */
359 Assert(accessor->write_pointer + accessor->sts->meta_data_size +
360 sizeof(uint32) < accessor->write_end);
361
362 /* Write the meta-data as one chunk. */
363 if (accessor->sts->meta_data_size > 0)
364 memcpy(accessor->write_pointer, meta_data,
365 accessor->sts->meta_data_size);
366
367 /*
368 * Write as much of the tuple as we can fit. This includes the
369 * tuple's size at the start.
370 */
371 written = accessor->write_end - accessor->write_pointer -
372 accessor->sts->meta_data_size;
373 memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
374 tuple, written);
375 ++accessor->write_chunk->ntuples;
376 size -= accessor->sts->meta_data_size;
377 size -= written;
378 /* Now write as many overflow chunks as we need for the rest. */
379 while (size > 0)
380 {
381 size_t written_this_chunk;
382
383 sts_flush_chunk(accessor);
384
385 /*
386 * How many overflow chunks to go? This will allow readers to
387 * skip all of them at once instead of reading each one.
388 */
389 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
391 written_this_chunk =
392 Min(accessor->write_end - accessor->write_pointer, size);
393 memcpy(accessor->write_pointer, (char *) tuple + written,
394 written_this_chunk);
395 accessor->write_pointer += written_this_chunk;
396 size -= written_this_chunk;
397 written += written_this_chunk;
398 }
399 return;
400 }
401 }
402
403 /* Copy meta-data and tuple into buffer. */
404 if (accessor->sts->meta_data_size > 0)
405 memcpy(accessor->write_pointer, meta_data,
406 accessor->sts->meta_data_size);
407 memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
408 tuple->t_len);
409 accessor->write_pointer += size;
410 ++accessor->write_chunk->ntuples;
411}
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267
#define Min(x, y)
Definition: c.h:1006
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263

References Assert(), BufFileCreateFileSet(), SharedTuplestoreAccessor::context, SharedTuplestoreChunk::data, SharedTuplestoreAccessor::fileset, SharedFileSet::fs, MAXPGPATH, MemoryContextAllocZero(), MemoryContextSwitchTo(), SharedTuplestore::meta_data_size, Min, name, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, SharedTuplestoreAccessor::participant, SharedTuplestore::participants, SharedTuplestoreAccessor::sts, STS_CHUNK_DATA_SIZE, STS_CHUNK_PAGES, sts_filename(), sts_flush_chunk(), MinimalTupleData::t_len, SharedTuplestoreAccessor::write_chunk, SharedTuplestoreAccessor::write_end, SharedTuplestoreAccessor::write_file, SharedTuplestoreAccessor::write_pointer, and SharedTuplestoreParticipant::writing.

Referenced by ExecParallelHashJoinPartitionOuter(), ExecParallelHashRepartitionFirst(), ExecParallelHashRepartitionRest(), and ExecParallelHashTableInsert().

◆ sts_read_tuple()

static MinimalTuple sts_read_tuple ( SharedTuplestoreAccessor accessor,
void *  meta_data 
)
static

Definition at line 414 of file sharedtuplestore.c.

415{
416 MinimalTuple tuple;
417 uint32 size;
418 size_t remaining_size;
419 size_t this_chunk_size;
420 char *destination;
421
422 /*
423 * We'll keep track of bytes read from this chunk so that we can detect an
424 * overflowing tuple and switch to reading overflow pages.
425 */
426 if (accessor->sts->meta_data_size > 0)
427 {
428 BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
429 accessor->read_bytes += accessor->sts->meta_data_size;
430 }
431 BufFileReadExact(accessor->read_file, &size, sizeof(size));
432 accessor->read_bytes += sizeof(size);
433 if (size > accessor->read_buffer_size)
434 {
435 size_t new_read_buffer_size;
436
437 if (accessor->read_buffer != NULL)
438 pfree(accessor->read_buffer);
439 new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
440 accessor->read_buffer =
441 MemoryContextAlloc(accessor->context, new_read_buffer_size);
442 accessor->read_buffer_size = new_read_buffer_size;
443 }
444 remaining_size = size - sizeof(uint32);
445 this_chunk_size = Min(remaining_size,
446 BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
447 destination = accessor->read_buffer + sizeof(uint32);
448 BufFileReadExact(accessor->read_file, destination, this_chunk_size);
449 accessor->read_bytes += this_chunk_size;
450 remaining_size -= this_chunk_size;
451 destination += this_chunk_size;
452 ++accessor->read_ntuples;
453
454 /* Check if we need to read any overflow chunks. */
455 while (remaining_size > 0)
456 {
457 /* We are now positioned at the start of an overflow chunk. */
458 SharedTuplestoreChunk chunk_header;
459
460 BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
462 if (chunk_header.overflow == 0)
465 errmsg("unexpected chunk in shared tuplestore temporary file"),
466 errdetail_internal("Expected overflow chunk.")));
467 accessor->read_next_page += STS_CHUNK_PAGES;
468 this_chunk_size = Min(remaining_size,
469 BLCKSZ * STS_CHUNK_PAGES -
471 BufFileReadExact(accessor->read_file, destination, this_chunk_size);
472 accessor->read_bytes += this_chunk_size;
473 remaining_size -= this_chunk_size;
474 destination += this_chunk_size;
475
476 /*
477 * These will be used to count regular tuples following the oversized
478 * tuple that spilled into this overflow chunk.
479 */
480 accessor->read_ntuples = 0;
481 accessor->read_ntuples_available = chunk_header.ntuples;
482 }
483
484 tuple = (MinimalTuple) accessor->read_buffer;
485 tuple->t_len = size;
486
487 return tuple;
488}
#define Max(x, y)
Definition: c.h:1000
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
MinimalTupleData * MinimalTuple
Definition: htup.h:27
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1229

References BufFileReadExact(), SharedTuplestoreAccessor::context, ereport, errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, Max, MemoryContextAlloc(), SharedTuplestore::meta_data_size, Min, SharedTuplestoreChunk::ntuples, SharedTuplestoreChunk::overflow, pfree(), SharedTuplestoreAccessor::read_buffer, SharedTuplestoreAccessor::read_buffer_size, SharedTuplestoreAccessor::read_bytes, SharedTuplestoreAccessor::read_file, SharedTuplestoreAccessor::read_next_page, SharedTuplestoreAccessor::read_ntuples, SharedTuplestoreAccessor::read_ntuples_available, SharedTuplestoreAccessor::sts, STS_CHUNK_HEADER_SIZE, STS_CHUNK_PAGES, and MinimalTupleData::t_len.

Referenced by sts_parallel_scan_next().

◆ sts_reinitialize()

void sts_reinitialize ( SharedTuplestoreAccessor accessor)

Definition at line 233 of file sharedtuplestore.c.

234{
235 int i;
236
237 /*
238 * Reset the shared read head for all participants' files. Also set the
239 * initial chunk size to the minimum (any increases from that size will be
240 * recorded in chunk_expansion_log).
241 */
242 for (i = 0; i < accessor->sts->nparticipants; ++i)
243 {
244 accessor->sts->participants[i].read_page = 0;
245 }
246}

References i, SharedTuplestore::nparticipants, SharedTuplestore::participants, SharedTuplestoreParticipant::read_page, and SharedTuplestoreAccessor::sts.