PostgreSQL Source Code git master
Loading...
Searching...
No Matches
read_stream.h File Reference
#include "storage/bufmgr.h"
#include "storage/smgr.h"
Include dependency graph for read_stream.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  BlockRangeReadStreamPrivate
 

Macros

#define READ_STREAM_DEFAULT   0x00
 
#define READ_STREAM_MAINTENANCE   0x01
 
#define READ_STREAM_SEQUENTIAL   0x02
 
#define READ_STREAM_FULL   0x04
 
#define READ_STREAM_USE_BATCHING   0x08
 

Typedefs

typedef struct ReadStream ReadStream
 
typedef struct BlockRangeReadStreamPrivate BlockRangeReadStreamPrivate
 
typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 

Functions

BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
BlockNumber read_stream_pause (ReadStream *stream)
 
void read_stream_resume (ReadStream *stream)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 
void read_stream_enable_stats (ReadStream *stream, struct IOStats *stats)
 

Macro Definition Documentation

◆ READ_STREAM_DEFAULT

#define READ_STREAM_DEFAULT   0x00

Definition at line 21 of file read_stream.h.

◆ READ_STREAM_FULL

#define READ_STREAM_FULL   0x04

Definition at line 43 of file read_stream.h.

◆ READ_STREAM_MAINTENANCE

#define READ_STREAM_MAINTENANCE   0x01

Definition at line 28 of file read_stream.h.

◆ READ_STREAM_SEQUENTIAL

#define READ_STREAM_SEQUENTIAL   0x02

Definition at line 36 of file read_stream.h.

◆ READ_STREAM_USE_BATCHING

#define READ_STREAM_USE_BATCHING   0x08

Definition at line 64 of file read_stream.h.

Typedef Documentation

◆ BlockRangeReadStreamPrivate

◆ ReadStream

Definition at line 67 of file read_stream.h.

◆ ReadStreamBlockNumberCB

typedef BlockNumber(* ReadStreamBlockNumberCB) (ReadStream *stream, void *callback_private_data, void *per_buffer_data)

Definition at line 78 of file read_stream.h.

Function Documentation

◆ block_range_read_stream_cb()

◆ read_stream_begin_relation()

ReadStream * read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)
extern

Definition at line 976 of file read_stream.c.

983{
984 return read_stream_begin_impl(flags,
985 strategy,
986 rel,
987 RelationGetSmgr(rel),
988 rel->rd_rel->relpersistence,
989 forknum,
990 callback,
991 callback_private_data,
992 per_buffer_data_size);
993}
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
static SMgrRelation RelationGetSmgr(Relation rel)
Definition rel.h:578
Form_pg_class rd_rel
Definition rel.h:111
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), autoprewarm_database_main(), blbulkdelete(), blgetbitmap(), blvacuumcleanup(), brin_vacuum_scan(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), ginvacuumcleanup(), gistvacuumscan(), hashbulkdelete(), heap_beginscan(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), pgstathashindex(), pgstatindex_impl(), read_stream_for_blocks(), spgvacuumscan(), statapprox_heap(), and verify_heapam().

◆ read_stream_begin_smgr_relation()

ReadStream * read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)
extern

Definition at line 1000 of file read_stream.c.

1008{
1009 return read_stream_begin_impl(flags,
1010 strategy,
1011 NULL,
1012 smgr,
1014 forknum,
1015 callback,
1016 callback_private_data,
1017 per_buffer_data_size);
1018}
static int fb(int x)

References callback(), fb(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_enable_stats()

void read_stream_enable_stats ( ReadStream stream,
struct IOStats stats 
)
extern

Definition at line 255 of file read_stream.c.

256{
257 stream->stats = stats;
258 if (stream->stats)
259 stream->stats->distance_capacity = stream->max_pinned_buffers;
260}
int16 distance_capacity
int16 max_pinned_buffers
IOStats * stats

References IOStats::distance_capacity, ReadStream::max_pinned_buffers, and ReadStream::stats.

Referenced by heap_beginscan().

◆ read_stream_end()

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)
extern

Definition at line 1377 of file read_stream.c.

1378{
1379 *strategy = stream->ios[0].op.strategy;
1380 return read_stream_get_block(stream, NULL);
1381}
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
ReadBuffersOperation op
Definition read_stream.c:89
BufferAccessStrategy strategy
Definition bufmgr.h:138
InProgressIO * ios

References fb(), ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)
extern

Definition at line 1030 of file read_stream.c.

1031{
1032 Buffer buffer;
1033 int16 oldest_buffer_index;
1034
1035#ifndef READ_STREAM_DISABLE_FAST_PATH
1036
1037 /*
1038 * A fast path for all-cached scans. This is the same as the usual
1039 * algorithm, but it is specialized for no I/O and no per-buffer data, so
1040 * we can skip the queue management code, stay in the same buffer slot and
1041 * use singular StartReadBuffer().
1042 */
1043 if (likely(stream->fast_path))
1044 {
1046
1047 /* Fast path assumptions. */
1048 Assert(stream->ios_in_progress == 0);
1049 Assert(stream->forwarded_buffers == 0);
1050 Assert(stream->pinned_buffers == 1);
1051 Assert(stream->readahead_distance == 1);
1052 Assert(stream->combine_distance == 1);
1053 Assert(stream->pending_read_nblocks == 0);
1054 Assert(stream->per_buffer_data_size == 0);
1056
1057 /* We're going to return the buffer we pinned last time. */
1058 oldest_buffer_index = stream->oldest_buffer_index;
1059 Assert((oldest_buffer_index + 1) % stream->queue_size ==
1060 stream->next_buffer_index);
1061 buffer = stream->buffers[oldest_buffer_index];
1062 Assert(buffer != InvalidBuffer);
1063
1064 /* Choose the next block to pin. */
1066
1068 {
1069 int flags = stream->read_buffers_flags;
1070
1071 if (stream->advice_enabled)
1073
1074 /*
1075 * While in fast-path, execute any IO that we might encounter
1076 * synchronously. Because we are, right now, only looking one
1077 * block ahead, dispatching any occasional IO to workers would
1078 * have the overhead of dispatching to workers, without any
1079 * realistic chance of the IO completing before we need it. We
1080 * will switch to non-synchronous IO after this.
1081 *
1082 * Arguably we should do so only for worker, as there's far less
1083 * dispatch overhead with io_uring. However, tests so far have not
1084 * shown a clear downside and additional io_method awareness here
1085 * seems not great from an abstraction POV.
1086 */
1088
1089 /*
1090 * Pin a buffer for the next call. Same buffer entry, and
1091 * arbitrary I/O entry (they're all free). We don't have to
1092 * adjust pinned_buffers because we're transferring one to caller
1093 * but pinning one more.
1094 *
1095 * In the fast path we don't need to check the pin limit. We're
1096 * always allowed at least one pin so that progress can be made,
1097 * and that's all we need here. Although two pins are momentarily
1098 * held at the same time, the model used here is that the stream
1099 * holds only one, and the other now belongs to the caller.
1100 */
1101 if (likely(!StartReadBuffer(&stream->ios[0].op,
1102 &stream->buffers[oldest_buffer_index],
1104 flags)))
1105 {
1106 /* Fast return. */
1108 return buffer;
1109 }
1110
1111 /* Next call must wait for I/O for the newly pinned buffer. */
1112 stream->oldest_io_index = 0;
1113 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
1114 stream->ios_in_progress = 1;
1115 stream->ios[0].buffer_index = oldest_buffer_index;
1116 stream->seq_blocknum = next_blocknum + 1;
1117
1118 /*
1119 * XXX: It might be worth triggering additional read-ahead here,
1120 * to avoid having to effectively do another synchronous IO for
1121 * the next block (if it were also a miss).
1122 */
1123
1124 /* update I/O stats */
1125 read_stream_count_io(stream, 1, stream->ios_in_progress);
1126
1127 /* update prefetch distance */
1129 }
1130 else
1131 {
1132 /* No more blocks, end of stream. */
1133 stream->readahead_distance = 0;
1134 stream->combine_distance = 0;
1135 stream->oldest_buffer_index = stream->next_buffer_index;
1136 stream->pinned_buffers = 0;
1137 stream->buffers[oldest_buffer_index] = InvalidBuffer;
1138 }
1139
1140 stream->fast_path = false;
1141 return buffer;
1142 }
1143#endif
1144
1145 if (unlikely(stream->pinned_buffers == 0))
1146 {
1147 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
1148
1149 /* End of stream reached? */
1150 if (stream->readahead_distance == 0)
1151 return InvalidBuffer;
1152
1153 /*
1154 * The usual order of operations is that we look ahead at the bottom
1155 * of this function after potentially finishing an I/O and making
1156 * space for more, but if we're just starting up we'll need to crank
1157 * the handle to get started.
1158 */
1159 read_stream_look_ahead(stream);
1160
1161 /* End of stream reached? */
1162 if (stream->pinned_buffers == 0)
1163 {
1164 Assert(stream->readahead_distance == 0);
1165 return InvalidBuffer;
1166 }
1167 }
1168
1169 /* Grab the oldest pinned buffer and associated per-buffer data. */
1170 Assert(stream->pinned_buffers > 0);
1171 oldest_buffer_index = stream->oldest_buffer_index;
1172 Assert(oldest_buffer_index >= 0 &&
1174 buffer = stream->buffers[oldest_buffer_index];
1175 if (per_buffer_data)
1176 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
1177
1178 Assert(BufferIsValid(buffer));
1179
1180 /* Do we have to wait for an associated I/O first? */
1181 if (stream->ios_in_progress > 0 &&
1182 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
1183 {
1184 int16 io_index = stream->oldest_io_index;
1185 bool needed_wait;
1186
1187 /* Sanity check that we still agree on the buffers. */
1188 Assert(stream->ios[io_index].op.buffers ==
1189 &stream->buffers[oldest_buffer_index]);
1190
1192
1193 Assert(stream->ios_in_progress > 0);
1194 stream->ios_in_progress--;
1195 if (++stream->oldest_io_index == stream->max_ios)
1196 stream->oldest_io_index = 0;
1197
1198 /*
1199 * If the IO was executed synchronously, we will never see
1200 * WaitReadBuffers() block. Treat it as if it did block. This is
1201 * particularly crucial when effective_io_concurrency=0 is used, as
1202 * all IO will be synchronous. Without treating synchronous IO as
1203 * having waited, we'd never allow the distance to get large enough to
1204 * allow for IO combining, resulting in bad performance.
1205 */
1207 needed_wait = true;
1208
1209 /* Count it as a wait if we need to wait for IO */
1210 if (needed_wait)
1211 read_stream_count_wait(stream);
1212
1213 /*
1214 * Have the read-ahead distance ramp up rapidly after we needed to
1215 * wait for IO. We only increase the read-ahead-distance when we
1216 * needed to wait, to avoid increasing the distance further than
1217 * necessary, as looking ahead too far can be costly, both due to the
1218 * cost of unnecessarily pinning many buffers and due to doing IOs
1219 * that may never be consumed if the stream is ended/reset before
1220 * completion.
1221 *
1222 * If we did not need to wait, the current distance was evidently
1223 * sufficient.
1224 *
1225 * NB: Must not increase the distance if we already reached the end of
1226 * the stream, as stream->readahead_distance == 0 is used to keep
1227 * track of having reached the end.
1228 */
1229 if (stream->readahead_distance > 0 && needed_wait)
1230 {
1231 /* wider temporary value, due to overflow risk */
1232 int32 readahead_distance;
1233
1234 readahead_distance = stream->readahead_distance * 2;
1235 readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
1236 stream->readahead_distance = readahead_distance;
1237 }
1238
1239 /*
1240 * As we needed IO, prevent distances from being reduced within our
1241 * maximum look-ahead window. This avoids collapsing distances too
1242 * quickly in workloads where most of the required blocks are cached,
1243 * but where the remaining IOs are a sufficient enough factor to cause
1244 * a substantial slowdown if executed synchronously.
1245 *
1246 * There are valid arguments for preventing decay for max_ios or for
1247 * max_pinned_buffers. But the argument for max_pinned_buffers seems
1248 * clearer - if we can't see any misses within the maximum look-ahead
1249 * distance, we can't do any useful read-ahead.
1250 */
1252
1253 /*
1254 * Whether we needed to wait or not, allow for more IO combining if we
1255 * needed to do IO. The reason to do so independent of needing to wait
1256 * is that when the data is resident in the kernel page cache, IO
1257 * combining reduces the syscall / dispatch overhead, making it
1258 * worthwhile regardless of needing to wait.
1259 *
1260 * It is also important with io_uring as it will never signal the need
1261 * to wait for reads if all the data is in the page cache. There are
1262 * heuristics to deal with that in method_io_uring.c, but they only
1263 * work when the IO gets large enough.
1264 */
1265 if (stream->combine_distance > 0 &&
1266 stream->combine_distance < stream->io_combine_limit)
1267 {
1268 /* wider temporary value, due to overflow risk */
1269 int32 combine_distance;
1270
1271 combine_distance = stream->combine_distance * 2;
1272 combine_distance = Min(combine_distance, stream->io_combine_limit);
1273 combine_distance = Min(combine_distance, stream->max_pinned_buffers);
1274 stream->combine_distance = combine_distance;
1275 }
1276
1277 /*
1278 * If we've reached the first block of a sequential region we're
1279 * issuing advice for, cancel that until the next jump. The kernel
1280 * will see the sequential preadv() pattern starting here.
1281 */
1282 if (stream->advice_enabled &&
1283 stream->ios[io_index].op.blocknum == stream->seq_until_processed)
1285 }
1286
1287 /*
1288 * We must zap this queue entry, or else it would appear as a forwarded
1289 * buffer. If it's potentially in the overflow zone (ie from a
1290 * multi-block I/O that wrapped around the queue), also zap the copy.
1291 */
1292 stream->buffers[oldest_buffer_index] = InvalidBuffer;
1294 stream->buffers[stream->queue_size + oldest_buffer_index] =
1296
1297#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
1298
1299 /*
1300 * The caller will get access to the per-buffer data, until the next call.
1301 * We wipe the one before, which is never occupied because queue_size
1302 * allowed one extra element. This will hopefully trip up client code
1303 * that is holding a dangling pointer to it.
1304 */
1305 if (stream->per_buffer_data)
1306 {
1307 void *per_buffer_data;
1308
1309 per_buffer_data = get_per_buffer_data(stream,
1310 oldest_buffer_index == 0 ?
1311 stream->queue_size - 1 :
1312 oldest_buffer_index - 1);
1313
1314#if defined(CLOBBER_FREED_MEMORY)
1315 /* This also tells Valgrind the memory is "noaccess". */
1316 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
1317#elif defined(USE_VALGRIND)
1318 /* Tell it ourselves. */
1319 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
1320 stream->per_buffer_data_size);
1321#endif
1322 }
1323#endif
1324
1326
1327 /* Pin transferred to caller. */
1328 Assert(stream->pinned_buffers > 0);
1329 stream->pinned_buffers--;
1330
1331 /* Advance oldest buffer, with wrap-around. */
1332 stream->oldest_buffer_index++;
1333 if (stream->oldest_buffer_index == stream->queue_size)
1334 stream->oldest_buffer_index = 0;
1335
1336 /* Prepare for the next call. */
1337 read_stream_look_ahead(stream);
1338
1339#ifndef READ_STREAM_DISABLE_FAST_PATH
1340 /* See if we can take the fast path for all-cached scans next time. */
1341 if (stream->ios_in_progress == 0 &&
1342 stream->forwarded_buffers == 0 &&
1343 stream->pinned_buffers == 1 &&
1344 stream->readahead_distance == 1 &&
1345 stream->combine_distance == 1 &&
1346 stream->pending_read_nblocks == 0 &&
1347 stream->per_buffer_data_size == 0)
1348 {
1349 /*
1350 * The fast path spins on one buffer entry repeatedly instead of
1351 * rotating through the whole queue and clearing the entries behind
1352 * it. If the buffer it starts with happened to be forwarded between
1353 * StartReadBuffers() calls and also wrapped around the circular queue
1354 * partway through, then a copy also exists in the overflow zone, and
1355 * it won't clear it out as the regular path would. Do that now, so
1356 * it doesn't need code for that.
1357 */
1358 if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1359 stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1361
1362 stream->fast_path = true;
1363 }
1364#endif
1365
1366 return buffer;
1367}
uint32 BlockNumber
Definition block.h:31
int Buffer
Definition buf.h:23
#define InvalidBuffer
Definition buf.h:25
bool WaitReadBuffers(ReadBuffersOperation *operation)
Definition bufmgr.c:1759
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition bufmgr.c:1637
int io_combine_limit
Definition bufmgr.c:215
#define READ_BUFFERS_ISSUE_ADVICE
Definition bufmgr.h:124
#define READ_BUFFERS_SYNCHRONOUSLY
Definition bufmgr.h:128
static bool BufferIsValid(Buffer bufnum)
Definition bufmgr.h:419
#define Min(x, y)
Definition c.h:1091
#define likely(x)
Definition c.h:437
#define Assert(condition)
Definition c.h:943
int16_t int16
Definition c.h:619
int32_t int32
Definition c.h:620
#define unlikely(x)
Definition c.h:438
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition memdebug.h:27
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
static void read_stream_look_ahead(ReadStream *stream)
static void read_stream_count_wait(ReadStream *stream)
static void read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
static void read_stream_count_prefetch(ReadStream *stream)
int16 buffer_index
Definition read_stream.c:88
BlockNumber blocknum
Definition bufmgr.h:146
int16 io_combine_limit
Definition read_stream.c:98
uint16 distance_decay_holdoff
int16 ios_in_progress
Definition read_stream.c:99
void * per_buffer_data
BlockNumber seq_until_processed
int16 pinned_buffers
int16 max_ios
Definition read_stream.c:97
int16 oldest_buffer_index
BlockNumber seq_blocknum
bool advice_enabled
int16 oldest_io_index
int16 combine_distance
int16 readahead_distance
int read_buffers_flags
int16 queue_size
int16 next_buffer_index
int16 initialized_buffers
size_t per_buffer_data_size
int16 forwarded_buffers
int16 next_io_index
int16 pending_read_nblocks
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]

References ReadStream::advice_enabled, Assert, ReadBuffersOperation::blocknum, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::combine_distance, ReadStream::distance_decay_holdoff, ReadStream::fast_path, fb(), ReadBuffersOperation::flags, ReadStream::forwarded_buffers, get_per_buffer_data(), ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, ReadStream::io_combine_limit, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, READ_BUFFERS_SYNCHRONOUSLY, read_stream_count_io(), read_stream_count_prefetch(), read_stream_count_wait(), read_stream_get_block(), read_stream_look_ahead(), ReadStream::readahead_distance, ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffer(), unlikely, VALGRIND_MAKE_MEM_NOACCESS, and WaitReadBuffers().

Referenced by autoprewarm_database_main(), BitmapHeapScanNextBlock(), blbulkdelete(), blgetbitmap(), blvacuumcleanup(), brin_vacuum_scan(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), ginvacuumcleanup(), gistvacuumscan(), hashbulkdelete(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), pgstathashindex(), pgstatindex_impl(), read_stream_for_blocks(), read_stream_reset(), RelationCopyStorageUsingBuffer(), spgvacuumscan(), statapprox_heap(), and verify_heapam().

◆ read_stream_pause()

BlockNumber read_stream_pause ( ReadStream stream)
extern

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)
extern

Definition at line 1417 of file read_stream.c.

1418{
1419 int16 index;
1420 Buffer buffer;
1421
1422 /* Stop looking ahead. */
1423 stream->readahead_distance = 0;
1424 stream->combine_distance = 0;
1425
1426 /* Forget buffered block number and fast path state. */
1428 stream->fast_path = false;
1429
1430 /* Unpin anything that wasn't consumed. */
1431 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1432 ReleaseBuffer(buffer);
1433
1434 /* Unpin any unused forwarded buffers. */
1435 index = stream->next_buffer_index;
1436 while (index < stream->initialized_buffers &&
1437 (buffer = stream->buffers[index]) != InvalidBuffer)
1438 {
1439 Assert(stream->forwarded_buffers > 0);
1440 stream->forwarded_buffers--;
1441 ReleaseBuffer(buffer);
1442
1443 stream->buffers[index] = InvalidBuffer;
1445 stream->buffers[stream->queue_size + index] = InvalidBuffer;
1446
1447 if (++index == stream->queue_size)
1448 index = 0;
1449 }
1450
1451 Assert(stream->forwarded_buffers == 0);
1452 Assert(stream->pinned_buffers == 0);
1453 Assert(stream->ios_in_progress == 0);
1454
1455 /* Start off assuming data is cached. */
1456 stream->readahead_distance = 1;
1457 stream->combine_distance = 1;
1459 stream->resume_combine_distance = stream->combine_distance;
1460 stream->distance_decay_holdoff = 0;
1461}
void ReleaseBuffer(Buffer buffer)
Definition bufmgr.c:5595
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
BlockNumber buffered_blocknum
Definition type.h:97

References Assert, ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::combine_distance, ReadStream::distance_decay_holdoff, ReadStream::fast_path, fb(), ReadStream::forwarded_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios_in_progress, ReadStream::next_buffer_index, ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_next_buffer(), ReadStream::readahead_distance, ReleaseBuffer(), ReadStream::resume_combine_distance, and ReadStream::resume_readahead_distance.

Referenced by btvacuumscan(), gistvacuumscan(), hashbulkdelete(), heap_fetch_next_buffer(), heap_rescan(), read_stream_end(), and spgvacuumscan().

◆ read_stream_resume()