PostgreSQL Source Code git master
read_stream.h File Reference
#include "storage/bufmgr.h"
#include "storage/smgr.h"
Include dependency graph for read_stream.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  BlockRangeReadStreamPrivate
 

Macros

#define READ_STREAM_DEFAULT   0x00
 
#define READ_STREAM_MAINTENANCE   0x01
 
#define READ_STREAM_SEQUENTIAL   0x02
 
#define READ_STREAM_FULL   0x04
 

Typedefs

typedef struct ReadStream ReadStream
 
typedef struct BlockRangeReadStreamPrivate BlockRangeReadStreamPrivate
 
typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 

Functions

BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Macro Definition Documentation

◆ READ_STREAM_DEFAULT

#define READ_STREAM_DEFAULT   0x00

Definition at line 21 of file read_stream.h.

◆ READ_STREAM_FULL

#define READ_STREAM_FULL   0x04

Definition at line 43 of file read_stream.h.

◆ READ_STREAM_MAINTENANCE

#define READ_STREAM_MAINTENANCE   0x01

Definition at line 28 of file read_stream.h.

◆ READ_STREAM_SEQUENTIAL

#define READ_STREAM_SEQUENTIAL   0x02

Definition at line 36 of file read_stream.h.

Typedef Documentation

◆ BlockRangeReadStreamPrivate

◆ ReadStream

typedef struct ReadStream ReadStream

Definition at line 46 of file read_stream.h.

◆ ReadStreamBlockNumberCB

typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)

Definition at line 56 of file read_stream.h.

Function Documentation

◆ block_range_read_stream_cb()

BlockNumber block_range_read_stream_cb ( ReadStream stream,
void *  callback_private_data,
void *  per_buffer_data 
)

◆ read_stream_begin_relation()

ReadStream * read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 551 of file read_stream.c.

558{
559 return read_stream_begin_impl(flags,
560 strategy,
561 rel,
562 RelationGetSmgr(rel),
563 rel->rd_rel->relpersistence,
564 forknum,
565 callback,
566 callback_private_data,
567 per_buffer_data_size);
568}
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:394
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
Form_pg_class rd_rel
Definition: rel.h:111
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), collect_corrupt_items(), collect_visibility_data(), heap_beginscan(), and pg_prewarm().

◆ read_stream_begin_smgr_relation()

ReadStream * read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 575 of file read_stream.c.

583{
584 return read_stream_begin_impl(flags,
585 strategy,
586 NULL,
587 smgr,
588 smgr_persistence,
589 forknum,
590 callback,
591 callback_private_data,
592 per_buffer_data_size);
593}

References callback(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

void read_stream_end ( ReadStream stream)

Definition at line 846 of file read_stream.c.

847{
848 read_stream_reset(stream);
849 pfree(stream);
850}
void pfree(void *pointer)
Definition: mcxt.c:1521
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:820

References pfree(), and read_stream_reset().

Referenced by acquire_sample_rows(), collect_corrupt_items(), collect_visibility_data(), heap_endscan(), pg_prewarm(), and RelationCopyStorageUsingBuffer().

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)

Definition at line 807 of file read_stream.c.

808{
809 *strategy = stream->ios[0].op.strategy;
810 return read_stream_get_block(stream, NULL);
811}
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:188
ReadBuffersOperation op
Definition: read_stream.c:103
BufferAccessStrategy strategy
Definition: bufmgr.h:122
InProgressIO * ios
Definition: read_stream.c:144

References ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)

Definition at line 605 of file read_stream.c.

606{
607 Buffer buffer;
608 int16 oldest_buffer_index;
609
610#ifndef READ_STREAM_DISABLE_FAST_PATH
611
612 /*
613 * A fast path for all-cached scans (behavior A). This is the same as the
614 * usual algorithm, but it is specialized for no I/O and no per-buffer
615 * data, so we can skip the queue management code, stay in the same buffer
616 * slot and use singular StartReadBuffer().
617 */
618 if (likely(stream->fast_path))
619 {
620 BlockNumber next_blocknum;
621
622 /* Fast path assumptions. */
623 Assert(stream->ios_in_progress == 0);
624 Assert(stream->pinned_buffers == 1);
625 Assert(stream->distance == 1);
626 Assert(stream->pending_read_nblocks == 0);
627 Assert(stream->per_buffer_data_size == 0);
628
629 /* We're going to return the buffer we pinned last time. */
630 oldest_buffer_index = stream->oldest_buffer_index;
631 Assert((oldest_buffer_index + 1) % stream->queue_size ==
632 stream->next_buffer_index);
633 buffer = stream->buffers[oldest_buffer_index];
634 Assert(buffer != InvalidBuffer);
635
636 /* Choose the next block to pin. */
637 next_blocknum = read_stream_get_block(stream, NULL);
638
639 if (likely(next_blocknum != InvalidBlockNumber))
640 {
641 /*
642 * Pin a buffer for the next call. Same buffer entry, and
643 * arbitrary I/O entry (they're all free). We don't have to
644 * adjust pinned_buffers because we're transferring one to caller
645 * but pinning one more.
646 */
647 if (likely(!StartReadBuffer(&stream->ios[0].op,
648 &stream->buffers[oldest_buffer_index],
649 next_blocknum,
650 stream->advice_enabled ?
652 {
653 /* Fast return. */
654 return buffer;
655 }
656
657 /* Next call must wait for I/O for the newly pinned buffer. */
658 stream->oldest_io_index = 0;
659 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
660 stream->ios_in_progress = 1;
661 stream->ios[0].buffer_index = oldest_buffer_index;
662 stream->seq_blocknum = next_blocknum + 1;
663 }
664 else
665 {
666 /* No more blocks, end of stream. */
667 stream->distance = 0;
668 stream->oldest_buffer_index = stream->next_buffer_index;
669 stream->pinned_buffers = 0;
670 }
671
672 stream->fast_path = false;
673 return buffer;
674 }
675#endif
676
677 if (unlikely(stream->pinned_buffers == 0))
678 {
679 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
680
681 /* End of stream reached? */
682 if (stream->distance == 0)
683 return InvalidBuffer;
684
685 /*
686 * The usual order of operations is that we look ahead at the bottom
687 * of this function after potentially finishing an I/O and making
688 * space for more, but if we're just starting up we'll need to crank
689 * the handle to get started.
690 */
691 read_stream_look_ahead(stream, true);
692
693 /* End of stream reached? */
694 if (stream->pinned_buffers == 0)
695 {
696 Assert(stream->distance == 0);
697 return InvalidBuffer;
698 }
699 }
700
701 /* Grab the oldest pinned buffer and associated per-buffer data. */
702 Assert(stream->pinned_buffers > 0);
703 oldest_buffer_index = stream->oldest_buffer_index;
704 Assert(oldest_buffer_index >= 0 &&
705 oldest_buffer_index < stream->queue_size);
706 buffer = stream->buffers[oldest_buffer_index];
707 if (per_buffer_data)
708 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
709
710 Assert(BufferIsValid(buffer));
711
712 /* Do we have to wait for an associated I/O first? */
713 if (stream->ios_in_progress > 0 &&
714 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
715 {
716 int16 io_index = stream->oldest_io_index;
717 int16 distance;
718
719 /* Sanity check that we still agree on the buffers. */
720 Assert(stream->ios[io_index].op.buffers ==
721 &stream->buffers[oldest_buffer_index]);
722
723 WaitReadBuffers(&stream->ios[io_index].op);
724
725 Assert(stream->ios_in_progress > 0);
726 stream->ios_in_progress--;
727 if (++stream->oldest_io_index == stream->max_ios)
728 stream->oldest_io_index = 0;
729
730 if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
731 {
732 /* Distance ramps up fast (behavior C). */
733 distance = stream->distance * 2;
734 distance = Min(distance, stream->max_pinned_buffers);
735 stream->distance = distance;
736 }
737 else
738 {
739 /* No advice; move towards io_combine_limit (behavior B). */
740 if (stream->distance > io_combine_limit)
741 {
742 stream->distance--;
743 }
744 else
745 {
746 distance = stream->distance * 2;
747 distance = Min(distance, io_combine_limit);
748 distance = Min(distance, stream->max_pinned_buffers);
749 stream->distance = distance;
750 }
751 }
752 }
753
754#ifdef CLOBBER_FREED_MEMORY
755 /* Clobber old buffer and per-buffer data for debugging purposes. */
756 stream->buffers[oldest_buffer_index] = InvalidBuffer;
757
758 /*
759 * The caller will get access to the per-buffer data, until the next call.
760 * We wipe the one before, which is never occupied because queue_size
761 * allowed one extra element. This will hopefully trip up client code
762 * that is holding a dangling pointer to it.
763 */
764 if (stream->per_buffer_data)
765 wipe_mem(get_per_buffer_data(stream,
766 oldest_buffer_index == 0 ?
767 stream->queue_size - 1 :
768 oldest_buffer_index - 1),
769 stream->per_buffer_data_size);
770#endif
771
772 /* Pin transferred to caller. */
773 Assert(stream->pinned_buffers > 0);
774 stream->pinned_buffers--;
775
776 /* Advance oldest buffer, with wrap-around. */
777 stream->oldest_buffer_index++;
778 if (stream->oldest_buffer_index == stream->queue_size)
779 stream->oldest_buffer_index = 0;
780
781 /* Prepare for the next call. */
782 read_stream_look_ahead(stream, false);
783
784#ifndef READ_STREAM_DISABLE_FAST_PATH
785 /* See if we can take the fast path for all-cached scans next time. */
786 if (stream->ios_in_progress == 0 &&
787 stream->pinned_buffers == 1 &&
788 stream->distance == 1 &&
789 stream->pending_read_nblocks == 0 &&
790 stream->per_buffer_data_size == 0)
791 {
792 stream->fast_path = true;
793 }
794#endif
795
796 return buffer;
797}
uint32 BlockNumber
Definition: block.h:31
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1410
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1382
int io_combine_limit
Definition: bufmgr.c:165
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:113
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:351
#define Min(x, y)
Definition: c.h:961
#define likely(x)
Definition: c.h:332
#define Assert(condition)
Definition: c.h:815
int16_t int16
Definition: c.h:483
#define unlikely(x)
Definition: c.h:333
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:307
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:160
int16 buffer_index
Definition: read_stream.c:102
Buffer * buffers
Definition: bufmgr.h:129
int16 distance
Definition: read_stream.c:116
int16 ios_in_progress
Definition: read_stream.c:112
void * per_buffer_data
Definition: read_stream.c:141
int16 pinned_buffers
Definition: read_stream.c:115
int16 max_ios
Definition: read_stream.c:111
int16 oldest_buffer_index
Definition: read_stream.c:151
BlockNumber seq_blocknum
Definition: read_stream.c:133
bool advice_enabled
Definition: read_stream.c:117
int16 max_pinned_buffers
Definition: read_stream.c:114
int16 oldest_io_index
Definition: read_stream.c:145
int16 queue_size
Definition: read_stream.c:113
int16 next_buffer_index
Definition: read_stream.c:152
size_t per_buffer_data_size
Definition: read_stream.c:140
int16 next_io_index
Definition: read_stream.c:146
bool fast_path
Definition: read_stream.c:148
int16 pending_read_nblocks
Definition: read_stream.c:137
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:153

References ReadStream::advice_enabled, Assert, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, ReadBuffersOperation::flags, get_per_buffer_data(), InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, READ_BUFFERS_ISSUE_ADVICE, read_stream_get_block(), read_stream_look_ahead(), ReadStream::seq_blocknum, StartReadBuffer(), unlikely, and WaitReadBuffers().

Referenced by collect_corrupt_items(), collect_visibility_data(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), pg_prewarm(), read_stream_reset(), and RelationCopyStorageUsingBuffer().

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 820 of file read_stream.c.

821{
822 Buffer buffer;
823
824 /* Stop looking ahead. */
825 stream->distance = 0;
826
827 /* Forget buffered block number and fast path state. */
829 stream->fast_path = false;
830
831 /* Unpin anything that wasn't consumed. */
832 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
833 ReleaseBuffer(buffer);
834
835 Assert(stream->pinned_buffers == 0);
836 Assert(stream->ios_in_progress == 0);
837
838 /* Start off assuming data is cached. */
839 stream->distance = 1;
840}
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4924
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:605
BlockNumber buffered_blocknum
Definition: read_stream.c:123

References Assert, ReadStream::buffered_blocknum, ReadStream::distance, ReadStream::fast_path, InvalidBlockNumber, InvalidBuffer, ReadStream::ios_in_progress, ReadStream::pinned_buffers, read_stream_next_buffer(), and ReleaseBuffer().

Referenced by heap_fetch_next_buffer(), heap_rescan(), and read_stream_end().