PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
read_stream.h File Reference
#include "storage/bufmgr.h"
#include "storage/smgr.h"
Include dependency graph for read_stream.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  BlockRangeReadStreamPrivate
 

Macros

#define READ_STREAM_DEFAULT   0x00
 
#define READ_STREAM_MAINTENANCE   0x01
 
#define READ_STREAM_SEQUENTIAL   0x02
 
#define READ_STREAM_FULL   0x04
 
#define READ_STREAM_USE_BATCHING   0x08
 

Typedefs

typedef struct ReadStream ReadStream
 
typedef struct BlockRangeReadStreamPrivate BlockRangeReadStreamPrivate
 
typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 

Functions

BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Macro Definition Documentation

◆ READ_STREAM_DEFAULT

#define READ_STREAM_DEFAULT   0x00

Definition at line 21 of file read_stream.h.

◆ READ_STREAM_FULL

#define READ_STREAM_FULL   0x04

Definition at line 43 of file read_stream.h.

◆ READ_STREAM_MAINTENANCE

#define READ_STREAM_MAINTENANCE   0x01

Definition at line 28 of file read_stream.h.

◆ READ_STREAM_SEQUENTIAL

#define READ_STREAM_SEQUENTIAL   0x02

Definition at line 36 of file read_stream.h.

◆ READ_STREAM_USE_BATCHING

#define READ_STREAM_USE_BATCHING   0x08

Definition at line 64 of file read_stream.h.

Typedef Documentation

◆ BlockRangeReadStreamPrivate

◆ ReadStream

typedef struct ReadStream ReadStream

Definition at line 67 of file read_stream.h.

◆ ReadStreamBlockNumberCB

typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)

Definition at line 77 of file read_stream.h.

Function Documentation

◆ block_range_read_stream_cb()

BlockNumber block_range_read_stream_cb ( ReadStream stream,
void *  callback_private_data,
void *  per_buffer_data 
)

◆ read_stream_begin_relation()

ReadStream * read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 716 of file read_stream.c.

723{
724 return read_stream_begin_impl(flags,
725 strategy,
726 rel,
727 RelationGetSmgr(rel),
728 rel->rd_rel->relpersistence,
729 forknum,
730 callback,
731 callback_private_data,
732 per_buffer_data_size);
733}
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:517
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:578
Form_pg_class rd_rel
Definition: rel.h:111
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), autoprewarm_database_main(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), gistvacuumscan(), heap_beginscan(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), spgvacuumscan(), and verify_heapam().

◆ read_stream_begin_smgr_relation()

ReadStream * read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 740 of file read_stream.c.

748{
749 return read_stream_begin_impl(flags,
750 strategy,
751 NULL,
752 smgr,
753 smgr_persistence,
754 forknum,
755 callback,
756 callback_private_data,
757 per_buffer_data_size);
758}

References callback(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

void read_stream_end ( ReadStream stream)

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)

Definition at line 997 of file read_stream.c.

998{
999 *strategy = stream->ios[0].op.strategy;
1000 return read_stream_get_block(stream, NULL);
1001}
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:179
ReadBuffersOperation op
Definition: read_stream.c:86
BufferAccessStrategy strategy
Definition: bufmgr.h:128
InProgressIO * ios
Definition: read_stream.c:135

References ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)

Definition at line 770 of file read_stream.c.

771{
772 Buffer buffer;
773 int16 oldest_buffer_index;
774
775#ifndef READ_STREAM_DISABLE_FAST_PATH
776
777 /*
778 * A fast path for all-cached scans. This is the same as the usual
779 * algorithm, but it is specialized for no I/O and no per-buffer data, so
780 * we can skip the queue management code, stay in the same buffer slot and
781 * use singular StartReadBuffer().
782 */
783 if (likely(stream->fast_path))
784 {
785 BlockNumber next_blocknum;
786
787 /* Fast path assumptions. */
788 Assert(stream->ios_in_progress == 0);
789 Assert(stream->forwarded_buffers == 0);
790 Assert(stream->pinned_buffers == 1);
791 Assert(stream->distance == 1);
792 Assert(stream->pending_read_nblocks == 0);
793 Assert(stream->per_buffer_data_size == 0);
795
796 /* We're going to return the buffer we pinned last time. */
797 oldest_buffer_index = stream->oldest_buffer_index;
798 Assert((oldest_buffer_index + 1) % stream->queue_size ==
799 stream->next_buffer_index);
800 buffer = stream->buffers[oldest_buffer_index];
801 Assert(buffer != InvalidBuffer);
802
803 /* Choose the next block to pin. */
804 next_blocknum = read_stream_get_block(stream, NULL);
805
806 if (likely(next_blocknum != InvalidBlockNumber))
807 {
808 int flags = stream->read_buffers_flags;
809
810 if (stream->advice_enabled)
812
813 /*
814 * Pin a buffer for the next call. Same buffer entry, and
815 * arbitrary I/O entry (they're all free). We don't have to
816 * adjust pinned_buffers because we're transferring one to caller
817 * but pinning one more.
818 *
819 * In the fast path we don't need to check the pin limit. We're
820 * always allowed at least one pin so that progress can be made,
821 * and that's all we need here. Although two pins are momentarily
822 * held at the same time, the model used here is that the stream
823 * holds only one, and the other now belongs to the caller.
824 */
825 if (likely(!StartReadBuffer(&stream->ios[0].op,
826 &stream->buffers[oldest_buffer_index],
827 next_blocknum,
828 flags)))
829 {
830 /* Fast return. */
831 return buffer;
832 }
833
834 /* Next call must wait for I/O for the newly pinned buffer. */
835 stream->oldest_io_index = 0;
836 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
837 stream->ios_in_progress = 1;
838 stream->ios[0].buffer_index = oldest_buffer_index;
839 stream->seq_blocknum = next_blocknum + 1;
840 }
841 else
842 {
843 /* No more blocks, end of stream. */
844 stream->distance = 0;
845 stream->oldest_buffer_index = stream->next_buffer_index;
846 stream->pinned_buffers = 0;
847 stream->buffers[oldest_buffer_index] = InvalidBuffer;
848 }
849
850 stream->fast_path = false;
851 return buffer;
852 }
853#endif
854
855 if (unlikely(stream->pinned_buffers == 0))
856 {
857 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
858
859 /* End of stream reached? */
860 if (stream->distance == 0)
861 return InvalidBuffer;
862
863 /*
864 * The usual order of operations is that we look ahead at the bottom
865 * of this function after potentially finishing an I/O and making
866 * space for more, but if we're just starting up we'll need to crank
867 * the handle to get started.
868 */
870
871 /* End of stream reached? */
872 if (stream->pinned_buffers == 0)
873 {
874 Assert(stream->distance == 0);
875 return InvalidBuffer;
876 }
877 }
878
879 /* Grab the oldest pinned buffer and associated per-buffer data. */
880 Assert(stream->pinned_buffers > 0);
881 oldest_buffer_index = stream->oldest_buffer_index;
882 Assert(oldest_buffer_index >= 0 &&
883 oldest_buffer_index < stream->queue_size);
884 buffer = stream->buffers[oldest_buffer_index];
885 if (per_buffer_data)
886 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
887
888 Assert(BufferIsValid(buffer));
889
890 /* Do we have to wait for an associated I/O first? */
891 if (stream->ios_in_progress > 0 &&
892 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
893 {
894 int16 io_index = stream->oldest_io_index;
895 int32 distance; /* wider temporary value, clamped below */
896
897 /* Sanity check that we still agree on the buffers. */
898 Assert(stream->ios[io_index].op.buffers ==
899 &stream->buffers[oldest_buffer_index]);
900
901 WaitReadBuffers(&stream->ios[io_index].op);
902
903 Assert(stream->ios_in_progress > 0);
904 stream->ios_in_progress--;
905 if (++stream->oldest_io_index == stream->max_ios)
906 stream->oldest_io_index = 0;
907
908 /* Look-ahead distance ramps up rapidly after we do I/O. */
909 distance = stream->distance * 2;
910 distance = Min(distance, stream->max_pinned_buffers);
911 stream->distance = distance;
912
913 /*
914 * If we've reached the first block of a sequential region we're
915 * issuing advice for, cancel that until the next jump. The kernel
916 * will see the sequential preadv() pattern starting here.
917 */
918 if (stream->advice_enabled &&
919 stream->ios[io_index].op.blocknum == stream->seq_until_processed)
921 }
922
923 /*
924 * We must zap this queue entry, or else it would appear as a forwarded
925 * buffer. If it's potentially in the overflow zone (ie from a
926 * multi-block I/O that wrapped around the queue), also zap the copy.
927 */
928 stream->buffers[oldest_buffer_index] = InvalidBuffer;
929 if (oldest_buffer_index < stream->io_combine_limit - 1)
930 stream->buffers[stream->queue_size + oldest_buffer_index] =
932
933#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
934
935 /*
936 * The caller will get access to the per-buffer data, until the next call.
937 * We wipe the one before, which is never occupied because queue_size
938 * allowed one extra element. This will hopefully trip up client code
939 * that is holding a dangling pointer to it.
940 */
941 if (stream->per_buffer_data)
942 {
943 void *per_buffer_data;
944
945 per_buffer_data = get_per_buffer_data(stream,
946 oldest_buffer_index == 0 ?
947 stream->queue_size - 1 :
948 oldest_buffer_index - 1);
949
950#if defined(CLOBBER_FREED_MEMORY)
951 /* This also tells Valgrind the memory is "noaccess". */
952 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
953#elif defined(USE_VALGRIND)
954 /* Tell it ourselves. */
955 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
956 stream->per_buffer_data_size);
957#endif
958 }
959#endif
960
961 /* Pin transferred to caller. */
962 Assert(stream->pinned_buffers > 0);
963 stream->pinned_buffers--;
964
965 /* Advance oldest buffer, with wrap-around. */
966 stream->oldest_buffer_index++;
967 if (stream->oldest_buffer_index == stream->queue_size)
968 stream->oldest_buffer_index = 0;
969
970 /* Prepare for the next call. */
972
973#ifndef READ_STREAM_DISABLE_FAST_PATH
974 /* See if we can take the fast path for all-cached scans next time. */
975 if (stream->ios_in_progress == 0 &&
976 stream->forwarded_buffers == 0 &&
977 stream->pinned_buffers == 1 &&
978 stream->distance == 1 &&
979 stream->pending_read_nblocks == 0 &&
980 stream->per_buffer_data_size == 0)
981 {
982 stream->fast_path = true;
983 }
984#endif
985
986 return buffer;
987}
uint32 BlockNumber
Definition: block.h:31
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1637
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1513
int io_combine_limit
Definition: bufmgr.c:170
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:114
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:368
#define Min(x, y)
Definition: c.h:975
#define likely(x)
Definition: c.h:346
int16_t int16
Definition: c.h:497
int32_t int32
Definition: c.h:498
#define unlikely(x)
Definition: c.h:347
Assert(PointerIsAligned(start, uint64))
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition: memdebug.h:27
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:151
static void read_stream_look_ahead(ReadStream *stream)
Definition: read_stream.c:408
int16 buffer_index
Definition: read_stream.c:85
Buffer * buffers
Definition: bufmgr.h:135
BlockNumber blocknum
Definition: bufmgr.h:136
int16 distance
Definition: read_stream.c:101
int16 ios_in_progress
Definition: read_stream.c:96
void * per_buffer_data
Definition: read_stream.c:132
BlockNumber seq_until_processed
Definition: read_stream.c:124
int16 pinned_buffers
Definition: read_stream.c:100
int16 max_ios
Definition: read_stream.c:94
int16 oldest_buffer_index
Definition: read_stream.c:142
BlockNumber seq_blocknum
Definition: read_stream.c:123
bool advice_enabled
Definition: read_stream.c:106
int16 max_pinned_buffers
Definition: read_stream.c:98
int16 oldest_io_index
Definition: read_stream.c:136
int read_buffers_flags
Definition: read_stream.c:103
int16 queue_size
Definition: read_stream.c:97
int16 next_buffer_index
Definition: read_stream.c:143
int16 initialized_buffers
Definition: read_stream.c:102
size_t per_buffer_data_size
Definition: read_stream.c:131
int16 forwarded_buffers
Definition: read_stream.c:99
int16 next_io_index
Definition: read_stream.c:137
bool fast_path
Definition: read_stream.c:139
int16 pending_read_nblocks
Definition: read_stream.c:128
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:144

References ReadStream::advice_enabled, Assert(), ReadBuffersOperation::blocknum, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, ReadStream::forwarded_buffers, get_per_buffer_data(), ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, read_stream_get_block(), read_stream_look_ahead(), ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffer(), unlikely, VALGRIND_MAKE_MEM_NOACCESS, and WaitReadBuffers().

Referenced by autoprewarm_database_main(), BitmapHeapScanNextBlock(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), gistvacuumscan(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), read_stream_reset(), RelationCopyStorageUsingBuffer(), spgvacuumscan(), and verify_heapam().

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 1010 of file read_stream.c.

1011{
1012 int16 index;
1013 Buffer buffer;
1014
1015 /* Stop looking ahead. */
1016 stream->distance = 0;
1017
1018 /* Forget buffered block number and fast path state. */
1020 stream->fast_path = false;
1021
1022 /* Unpin anything that wasn't consumed. */
1023 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1024 ReleaseBuffer(buffer);
1025
1026 /* Unpin any unused forwarded buffers. */
1027 index = stream->next_buffer_index;
1028 while (index < stream->initialized_buffers &&
1029 (buffer = stream->buffers[index]) != InvalidBuffer)
1030 {
1031 Assert(stream->forwarded_buffers > 0);
1032 stream->forwarded_buffers--;
1033 ReleaseBuffer(buffer);
1034
1035 stream->buffers[index] = InvalidBuffer;
1037 stream->buffers[stream->queue_size + index] = InvalidBuffer;
1038
1039 if (++index == stream->queue_size)
1040 index = 0;
1041 }
1042
1043 Assert(stream->forwarded_buffers == 0);
1044 Assert(stream->pinned_buffers == 0);
1045 Assert(stream->ios_in_progress == 0);
1046
1047 /* Start off assuming data is cached. */
1048 stream->distance = 1;
1049}
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:5373
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:770
BlockNumber buffered_blocknum
Definition: read_stream.c:113
Definition: type.h:96

References Assert(), ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::distance, ReadStream::fast_path, ReadStream::forwarded_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios_in_progress, ReadStream::next_buffer_index, ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_next_buffer(), and ReleaseBuffer().

Referenced by btvacuumscan(), gistvacuumscan(), heap_fetch_next_buffer(), heap_rescan(), read_stream_end(), and spgvacuumscan().