212 #ifndef READ_STREAM_DISABLE_FAST_PATH
260 if (!suppress_advice &&
272 &stream->
buffers[buffer_index],
304 overflow = (buffer_index + nblocks) - stream->
queue_size;
308 sizeof(stream->
buffers[0]) * overflow);
311 buffer_index += nblocks;
314 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
330 void *per_buffer_data;
335 suppress_advice =
false;
347 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
369 suppress_advice =
false;
415 void *callback_private_data,
416 size_t per_buffer_data_size)
422 int strategy_pin_limit;
423 uint32 max_pinned_buffers;
463 max_pinned_buffers =
Min(max_pinned_buffers,
468 max_pinned_buffers =
Min(strategy_pin_limit, max_pinned_buffers);
475 Assert(max_pinned_buffers > 0);
483 queue_size = max_pinned_buffers + 1;
496 size += per_buffer_data_size * queue_size;
497 size += MAXIMUM_ALIGNOF * 2;
499 memset(stream, 0, offsetof(
ReadStream, buffers));
502 if (per_buffer_data_size > 0)
550 for (
int i = 0;
i < max_ios; ++
i)
575 int16 oldest_buffer_index;
577 #ifndef READ_STREAM_DISABLE_FAST_PATH
600 buffer = stream->
buffers[oldest_buffer_index];
617 &stream->
buffers[oldest_buffer_index],
673 Assert(oldest_buffer_index >= 0 &&
674 oldest_buffer_index < stream->queue_size);
675 buffer = stream->
buffers[oldest_buffer_index];
690 &stream->
buffers[oldest_buffer_index]);
723 #ifdef CLOBBER_FREED_MEMORY
735 oldest_buffer_index == 0 ?
737 oldest_buffer_index - 1),
753 #ifndef READ_STREAM_DISABLE_FAST_PATH
#define InvalidBlockNumber
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
void ReleaseBuffer(Buffer buffer)
void WaitReadBuffers(ReadBuffersOperation *operation)
void LimitAdditionalPins(uint32 *additional_pins)
int effective_io_concurrency
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
@ READ_BUFFERS_ISSUE_ADVICE
static bool BufferIsValid(Buffer bufnum)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define OidIsValid(objectId)
bool IsCatalogRelation(Relation relation)
bool IsCatalogRelationOid(Oid relid)
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
if(TABLE==NULL||TABLE_index==NULL)
void LimitAdditionalLocalPins(uint32 *additional_pins)
void pfree(void *pointer)
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
ReadStream * read_stream_begin_relation(int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
void read_stream_reset(ReadStream *stream)
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
static void read_stream_fill_blocknums(ReadStream *stream)
struct InProgressIO InProgressIO
void read_stream_end(ReadStream *stream)
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
#define READ_STREAM_MAINTENANCE
BlockNumber(* ReadStreamBlockNumberCB)(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
#define READ_STREAM_SEQUENTIAL
static SMgrRelation RelationGetSmgr(Relation rel)
static pg_noinline void Size size
int get_tablespace_io_concurrency(Oid spcid)
int get_tablespace_maintenance_io_concurrency(Oid spcid)
BufferAccessStrategy strategy
struct SMgrRelationData * smgr
int16 oldest_buffer_index
BlockNumber pending_read_blocknum
BlockNumber blocknums[16]
size_t per_buffer_data_size
ReadStreamBlockNumberCB callback
int16 pending_read_nblocks
void * callback_private_data
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
RelFileLocatorBackend smgr_rlocator
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)