38 #define STS_CHUNK_PAGES 4
39 #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
40 #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
128 int my_participant_number,
129 size_t meta_data_size,
137 Assert(my_participant_number < participants);
143 if (strlen(
name) >
sizeof(sts->
name) - 1)
144 elog(
ERROR,
"SharedTuplestore name too long");
156 for (
i = 0;
i < participants; ++
i)
180 int my_participant_number,
185 Assert(my_participant_number < sts->nparticipants);
383 size_t written_this_chunk;
398 size -= written_this_chunk;
399 written += written_this_chunk;
420 size_t remaining_size;
421 size_t this_chunk_size;
437 size_t new_read_buffer_size;
446 remaining_size = size -
sizeof(
uint32);
447 this_chunk_size =
Min(remaining_size,
452 remaining_size -= this_chunk_size;
453 destination += this_chunk_size;
457 while (remaining_size > 0)
467 errmsg(
"unexpected chunk in shared tuplestore temporary file"),
470 this_chunk_size =
Min(remaining_size,
475 remaining_size -= this_chunk_size;
476 destination += this_chunk_size;
549 errmsg(
"could not seek to block %u in shared tuplestore temporary file",
int BufFileSeekBlock(BufFile *file, int64 blknum)
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
void BufFileClose(BufFile *file)
#define PG_USED_FOR_ASSERTS_ONLY
#define FLEXIBLE_ARRAY_MEMBER
elog(ERROR, "%s: %s", p2, msg)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
MinimalTupleData * MinimalTuple
if(TABLE==NULL||TABLE_index==NULL)
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_SHARED_TUPLESTORE
void pfree(void *pointer)
void * palloc0(Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void sts_reinitialize(SharedTuplestoreAccessor *accessor)
static MinimalTuple sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
static void sts_flush_chunk(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_initialize(SharedTuplestore *sts, int participants, int my_participant_number, size_t meta_data_size, int flags, SharedFileSet *fileset, const char *name)
#define STS_CHUNK_HEADER_SIZE
MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
struct SharedTuplestoreChunk SharedTuplestoreChunk
struct SharedTuplestoreParticipant SharedTuplestoreParticipant
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
void sts_end_write(SharedTuplestoreAccessor *accessor)
SharedTuplestoreAccessor * sts_attach(SharedTuplestore *sts, int my_participant_number, SharedFileSet *fileset)
size_t sts_estimate(int participants)
#define STS_CHUNK_DATA_SIZE
void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple)
void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
SharedTuplestoreChunk * write_chunk
int read_ntuples_available
BlockNumber read_next_page
char data[FLEXIBLE_ARRAY_MEMBER]
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]