PostgreSQL Source Code git master
Loading...
Searching...
No Matches
read_stream.h File Reference
#include "storage/bufmgr.h"
#include "storage/smgr.h"
Include dependency graph for read_stream.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  BlockRangeReadStreamPrivate
 

Macros

#define READ_STREAM_DEFAULT   0x00
 
#define READ_STREAM_MAINTENANCE   0x01
 
#define READ_STREAM_SEQUENTIAL   0x02
 
#define READ_STREAM_FULL   0x04
 
#define READ_STREAM_USE_BATCHING   0x08
 

Typedefs

typedef struct ReadStream ReadStream
 
typedef struct BlockRangeReadStreamPrivate BlockRangeReadStreamPrivate
 
typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 

Functions

BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
BlockNumber read_stream_pause (ReadStream *stream)
 
void read_stream_resume (ReadStream *stream)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Macro Definition Documentation

◆ READ_STREAM_DEFAULT

#define READ_STREAM_DEFAULT   0x00

Definition at line 21 of file read_stream.h.

◆ READ_STREAM_FULL

#define READ_STREAM_FULL   0x04

Definition at line 43 of file read_stream.h.

◆ READ_STREAM_MAINTENANCE

#define READ_STREAM_MAINTENANCE   0x01

Definition at line 28 of file read_stream.h.

◆ READ_STREAM_SEQUENTIAL

#define READ_STREAM_SEQUENTIAL   0x02

Definition at line 36 of file read_stream.h.

◆ READ_STREAM_USE_BATCHING

#define READ_STREAM_USE_BATCHING   0x08

Definition at line 64 of file read_stream.h.

Typedef Documentation

◆ BlockRangeReadStreamPrivate

◆ ReadStream

Definition at line 67 of file read_stream.h.

◆ ReadStreamBlockNumberCB

typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)

Definition at line 77 of file read_stream.h.

Function Documentation

◆ block_range_read_stream_cb()

◆ read_stream_begin_relation()

ReadStream * read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)
extern

Definition at line 739 of file read_stream.c.

746{
747 return read_stream_begin_impl(flags,
748 strategy,
749 rel,
750 RelationGetSmgr(rel),
751 rel->rd_rel->relpersistence,
752 forknum,
753 callback,
754 callback_private_data,
755 per_buffer_data_size);
756}
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
static SMgrRelation RelationGetSmgr(Relation rel)
Definition rel.h:576
Form_pg_class rd_rel
Definition rel.h:111
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), autoprewarm_database_main(), blbulkdelete(), blgetbitmap(), blvacuumcleanup(), brin_vacuum_scan(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), ginvacuumcleanup(), gistvacuumscan(), hashbulkdelete(), heap_beginscan(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), pgstathashindex(), pgstatindex_impl(), spgvacuumscan(), statapprox_heap(), and verify_heapam().

◆ read_stream_begin_smgr_relation()

ReadStream * read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)
extern

Definition at line 763 of file read_stream.c.

771{
772 return read_stream_begin_impl(flags,
773 strategy,
774 NULL,
775 smgr,
777 forknum,
778 callback,
779 callback_private_data,
780 per_buffer_data_size);
781}
static int fb(int x)

References callback(), fb(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)
extern

Definition at line 1033 of file read_stream.c.

1034{
1035 *strategy = stream->ios[0].op.strategy;
1036 return read_stream_get_block(stream, NULL);
1037}
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
ReadBuffersOperation op
Definition read_stream.c:86
BufferAccessStrategy strategy
Definition bufmgr.h:138
InProgressIO * ios

References fb(), ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)
extern

Definition at line 793 of file read_stream.c.

794{
795 Buffer buffer;
796 int16 oldest_buffer_index;
797
798#ifndef READ_STREAM_DISABLE_FAST_PATH
799
800 /*
801 * A fast path for all-cached scans. This is the same as the usual
802 * algorithm, but it is specialized for no I/O and no per-buffer data, so
803 * we can skip the queue management code, stay in the same buffer slot and
804 * use singular StartReadBuffer().
805 */
806 if (likely(stream->fast_path))
807 {
809
810 /* Fast path assumptions. */
811 Assert(stream->ios_in_progress == 0);
812 Assert(stream->forwarded_buffers == 0);
813 Assert(stream->pinned_buffers == 1);
814 Assert(stream->distance == 1);
815 Assert(stream->pending_read_nblocks == 0);
816 Assert(stream->per_buffer_data_size == 0);
818
819 /* We're going to return the buffer we pinned last time. */
820 oldest_buffer_index = stream->oldest_buffer_index;
821 Assert((oldest_buffer_index + 1) % stream->queue_size ==
822 stream->next_buffer_index);
823 buffer = stream->buffers[oldest_buffer_index];
824 Assert(buffer != InvalidBuffer);
825
826 /* Choose the next block to pin. */
828
830 {
831 int flags = stream->read_buffers_flags;
832
833 if (stream->advice_enabled)
835
836 /*
837 * Pin a buffer for the next call. Same buffer entry, and
838 * arbitrary I/O entry (they're all free). We don't have to
839 * adjust pinned_buffers because we're transferring one to caller
840 * but pinning one more.
841 *
842 * In the fast path we don't need to check the pin limit. We're
843 * always allowed at least one pin so that progress can be made,
844 * and that's all we need here. Although two pins are momentarily
845 * held at the same time, the model used here is that the stream
846 * holds only one, and the other now belongs to the caller.
847 */
848 if (likely(!StartReadBuffer(&stream->ios[0].op,
849 &stream->buffers[oldest_buffer_index],
851 flags)))
852 {
853 /* Fast return. */
854 return buffer;
855 }
856
857 /* Next call must wait for I/O for the newly pinned buffer. */
858 stream->oldest_io_index = 0;
859 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
860 stream->ios_in_progress = 1;
861 stream->ios[0].buffer_index = oldest_buffer_index;
862 stream->seq_blocknum = next_blocknum + 1;
863 }
864 else
865 {
866 /* No more blocks, end of stream. */
867 stream->distance = 0;
868 stream->oldest_buffer_index = stream->next_buffer_index;
869 stream->pinned_buffers = 0;
870 stream->buffers[oldest_buffer_index] = InvalidBuffer;
871 }
872
873 stream->fast_path = false;
874 return buffer;
875 }
876#endif
877
878 if (unlikely(stream->pinned_buffers == 0))
879 {
880 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
881
882 /* End of stream reached? */
883 if (stream->distance == 0)
884 return InvalidBuffer;
885
886 /*
887 * The usual order of operations is that we look ahead at the bottom
888 * of this function after potentially finishing an I/O and making
889 * space for more, but if we're just starting up we'll need to crank
890 * the handle to get started.
891 */
893
894 /* End of stream reached? */
895 if (stream->pinned_buffers == 0)
896 {
897 Assert(stream->distance == 0);
898 return InvalidBuffer;
899 }
900 }
901
902 /* Grab the oldest pinned buffer and associated per-buffer data. */
903 Assert(stream->pinned_buffers > 0);
904 oldest_buffer_index = stream->oldest_buffer_index;
905 Assert(oldest_buffer_index >= 0 &&
907 buffer = stream->buffers[oldest_buffer_index];
908 if (per_buffer_data)
909 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
910
911 Assert(BufferIsValid(buffer));
912
913 /* Do we have to wait for an associated I/O first? */
914 if (stream->ios_in_progress > 0 &&
915 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
916 {
918 int32 distance; /* wider temporary value, clamped below */
919
920 /* Sanity check that we still agree on the buffers. */
921 Assert(stream->ios[io_index].op.buffers ==
922 &stream->buffers[oldest_buffer_index]);
923
924 WaitReadBuffers(&stream->ios[io_index].op);
925
926 Assert(stream->ios_in_progress > 0);
927 stream->ios_in_progress--;
928 if (++stream->oldest_io_index == stream->max_ios)
929 stream->oldest_io_index = 0;
930
931 /* Look-ahead distance ramps up rapidly after we do I/O. */
932 distance = stream->distance * 2;
933 distance = Min(distance, stream->max_pinned_buffers);
934 stream->distance = distance;
935
936 /*
937 * If we've reached the first block of a sequential region we're
938 * issuing advice for, cancel that until the next jump. The kernel
939 * will see the sequential preadv() pattern starting here.
940 */
941 if (stream->advice_enabled &&
942 stream->ios[io_index].op.blocknum == stream->seq_until_processed)
944 }
945
946 /*
947 * We must zap this queue entry, or else it would appear as a forwarded
948 * buffer. If it's potentially in the overflow zone (ie from a
949 * multi-block I/O that wrapped around the queue), also zap the copy.
950 */
951 stream->buffers[oldest_buffer_index] = InvalidBuffer;
953 stream->buffers[stream->queue_size + oldest_buffer_index] =
955
956#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
957
958 /*
959 * The caller will get access to the per-buffer data, until the next call.
960 * We wipe the one before, which is never occupied because queue_size
961 * allowed one extra element. This will hopefully trip up client code
962 * that is holding a dangling pointer to it.
963 */
964 if (stream->per_buffer_data)
965 {
966 void *per_buffer_data;
967
968 per_buffer_data = get_per_buffer_data(stream,
969 oldest_buffer_index == 0 ?
970 stream->queue_size - 1 :
971 oldest_buffer_index - 1);
972
973#if defined(CLOBBER_FREED_MEMORY)
974 /* This also tells Valgrind the memory is "noaccess". */
975 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
976#elif defined(USE_VALGRIND)
977 /* Tell it ourselves. */
978 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
979 stream->per_buffer_data_size);
980#endif
981 }
982#endif
983
984 /* Pin transferred to caller. */
985 Assert(stream->pinned_buffers > 0);
986 stream->pinned_buffers--;
987
988 /* Advance oldest buffer, with wrap-around. */
989 stream->oldest_buffer_index++;
990 if (stream->oldest_buffer_index == stream->queue_size)
991 stream->oldest_buffer_index = 0;
992
993 /* Prepare for the next call. */
995
996#ifndef READ_STREAM_DISABLE_FAST_PATH
997 /* See if we can take the fast path for all-cached scans next time. */
998 if (stream->ios_in_progress == 0 &&
999 stream->forwarded_buffers == 0 &&
1000 stream->pinned_buffers == 1 &&
1001 stream->distance == 1 &&
1002 stream->pending_read_nblocks == 0 &&
1003 stream->per_buffer_data_size == 0)
1004 {
1005 /*
1006 * The fast path spins on one buffer entry repeatedly instead of
1007 * rotating through the whole queue and clearing the entries behind
1008 * it. If the buffer it starts with happened to be forwarded between
1009 * StartReadBuffers() calls and also wrapped around the circular queue
1010 * partway through, then a copy also exists in the overflow zone, and
1011 * it won't clear it out as the regular path would. Do that now, so
1012 * it doesn't need code for that.
1013 */
1014 if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1015 stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1017
1018 stream->fast_path = true;
1019 }
1020#endif
1021
1022 return buffer;
1023}
uint32 BlockNumber
Definition block.h:31
int Buffer
Definition buf.h:23
#define InvalidBuffer
Definition buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition bufmgr.c:1742
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition bufmgr.c:1618
int io_combine_limit
Definition bufmgr.c:215
#define READ_BUFFERS_ISSUE_ADVICE
Definition bufmgr.h:124
static bool BufferIsValid(Buffer bufnum)
Definition bufmgr.h:421
#define Min(x, y)
Definition c.h:1093
#define likely(x)
Definition c.h:431
#define Assert(condition)
Definition c.h:945
int16_t int16
Definition c.h:613
int32_t int32
Definition c.h:614
#define unlikely(x)
Definition c.h:432
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition memdebug.h:27
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
static void read_stream_look_ahead(ReadStream *stream)
int16 buffer_index
Definition read_stream.c:85
BlockNumber blocknum
Definition bufmgr.h:146
int16 io_combine_limit
Definition read_stream.c:95
int16 distance
int16 ios_in_progress
Definition read_stream.c:96
void * per_buffer_data
BlockNumber seq_until_processed
int16 pinned_buffers
int16 max_ios
Definition read_stream.c:94
int16 oldest_buffer_index
BlockNumber seq_blocknum
bool advice_enabled
int16 max_pinned_buffers
Definition read_stream.c:98
int16 oldest_io_index
int read_buffers_flags
int16 queue_size
Definition read_stream.c:97
int16 next_buffer_index
int16 initialized_buffers
size_t per_buffer_data_size
int16 forwarded_buffers
Definition read_stream.c:99
int16 next_io_index
int16 pending_read_nblocks
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]

References ReadStream::advice_enabled, Assert, ReadBuffersOperation::blocknum, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, fb(), ReadStream::forwarded_buffers, get_per_buffer_data(), ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, ReadStream::io_combine_limit, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, read_stream_get_block(), read_stream_look_ahead(), ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffer(), unlikely, VALGRIND_MAKE_MEM_NOACCESS, and WaitReadBuffers().

Referenced by autoprewarm_database_main(), BitmapHeapScanNextBlock(), blbulkdelete(), blgetbitmap(), blvacuumcleanup(), brin_vacuum_scan(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), ginvacuumcleanup(), gistvacuumscan(), hashbulkdelete(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), pgstathashindex(), pgstatindex_impl(), read_stream_reset(), RelationCopyStorageUsingBuffer(), spgvacuumscan(), statapprox_heap(), and verify_heapam().

◆ read_stream_pause()

BlockNumber read_stream_pause ( ReadStream stream)
extern

Definition at line 1045 of file read_stream.c.

1046{
1047 stream->resume_distance = stream->distance;
1048 stream->distance = 0;
1049 return InvalidBlockNumber;
1050}
int16 resume_distance

References ReadStream::distance, InvalidBlockNumber, and ReadStream::resume_distance.

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)
extern

Definition at line 1070 of file read_stream.c.

1071{
1072 int16 index;
1073 Buffer buffer;
1074
1075 /* Stop looking ahead. */
1076 stream->distance = 0;
1077
1078 /* Forget buffered block number and fast path state. */
1080 stream->fast_path = false;
1081
1082 /* Unpin anything that wasn't consumed. */
1083 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1084 ReleaseBuffer(buffer);
1085
1086 /* Unpin any unused forwarded buffers. */
1087 index = stream->next_buffer_index;
1088 while (index < stream->initialized_buffers &&
1089 (buffer = stream->buffers[index]) != InvalidBuffer)
1090 {
1091 Assert(stream->forwarded_buffers > 0);
1092 stream->forwarded_buffers--;
1093 ReleaseBuffer(buffer);
1094
1095 stream->buffers[index] = InvalidBuffer;
1097 stream->buffers[stream->queue_size + index] = InvalidBuffer;
1098
1099 if (++index == stream->queue_size)
1100 index = 0;
1101 }
1102
1103 Assert(stream->forwarded_buffers == 0);
1104 Assert(stream->pinned_buffers == 0);
1105 Assert(stream->ios_in_progress == 0);
1106
1107 /* Start off assuming data is cached. */
1108 stream->distance = 1;
1109 stream->resume_distance = stream->distance;
1110}
void ReleaseBuffer(Buffer buffer)
Definition bufmgr.c:5505
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
BlockNumber buffered_blocknum
Definition type.h:96

References Assert, ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::distance, ReadStream::fast_path, fb(), ReadStream::forwarded_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios_in_progress, ReadStream::next_buffer_index, ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_next_buffer(), ReleaseBuffer(), and ReadStream::resume_distance.

Referenced by btvacuumscan(), gistvacuumscan(), hashbulkdelete(), heap_fetch_next_buffer(), heap_rescan(), read_stream_end(), and spgvacuumscan().

◆ read_stream_resume()

void read_stream_resume ( ReadStream stream)
extern

Definition at line 1058 of file read_stream.c.

1059{
1060 stream->distance = stream->resume_distance;
1061}

References ReadStream::distance, and ReadStream::resume_distance.