37#define STS_CHUNK_PAGES 4
38#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
39#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
127 int my_participant_number,
128 size_t meta_data_size,
136 Assert(my_participant_number < participants);
142 if (strlen(
name) >
sizeof(sts->
name) - 1)
143 elog(
ERROR,
"SharedTuplestore name too long");
155 for (
i = 0;
i < participants; ++
i)
179 int my_participant_number,
184 Assert(my_participant_number < sts->nparticipants);
382 size_t written_this_chunk;
397 size -= written_this_chunk;
398 written += written_this_chunk;
419 size_t remaining_size;
420 size_t this_chunk_size;
436 size_t new_read_buffer_size;
446 this_chunk_size =
Min(remaining_size,
451 remaining_size -= this_chunk_size;
452 destination += this_chunk_size;
456 while (remaining_size > 0)
466 errmsg(
"unexpected chunk in shared tuplestore temporary file"),
469 this_chunk_size =
Min(remaining_size,
474 remaining_size -= this_chunk_size;
475 destination += this_chunk_size;
548 errmsg(
"could not seek to block %u in shared tuplestore temporary file",
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
int BufFileSeekBlock(BufFile *file, int64 blknum)
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
void BufFileClose(BufFile *file)
#define PG_USED_FOR_ASSERTS_ONLY
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
MinimalTupleData * MinimalTuple
if(TABLE==NULL||TABLE_index==NULL)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_SHARED_TUPLESTORE
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void sts_reinitialize(SharedTuplestoreAccessor *accessor)
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
#define STS_CHUNK_HEADER_SIZE
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
struct SharedTuplestoreChunk SharedTuplestoreChunk
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
size_t sts_estimate(int participants)
#define STS_CHUNK_DATA_SIZE
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
static pg_noinline void Size size
SharedTuplestoreChunk * write_chunk
int read_ntuples_available
BlockNumber read_next_page
char data[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]