PostgreSQL Source Code git master
read_stream.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * read_stream.c
4 * Mechanism for accessing buffered relation data with look-ahead
5 *
6 * Code that needs to access relation data typically pins blocks one at a
7 * time, often in a predictable order that might be sequential or data-driven.
8 * Calling the simple ReadBuffer() function for each block is inefficient,
9 * because blocks that are not yet in the buffer pool require I/O operations
10 * that are small and might stall waiting for storage. This mechanism looks
11 * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 * distance.
14 *
15 * A user-provided callback generates a stream of block numbers that is used
16 * to form reads of up to io_combine_limit, by attempting to merge them with a
17 * pending read. When that isn't possible, the existing pending read is sent
18 * to StartReadBuffers() so that a new one can begin to form.
19 *
20 * The algorithm for controlling the look-ahead distance tries to classify the
21 * stream into three ideal behaviors:
22 *
23 * A) No I/O is necessary, because the requested blocks are fully cached
24 * already. There is no benefit to looking ahead more than one block, so
25 * distance is 1. This is the default initial assumption.
26 *
27 * B) I/O is necessary, but read-ahead advice is undesirable because the
28 * access is sequential and we can rely on the kernel's read-ahead heuristics,
29 * or impossible because direct I/O is enabled, or the system doesn't support
30 * read-ahead advice. There is no benefit in looking ahead more than
31 * io_combine_limit, because in this case the only goal is larger read system
32 * calls. Looking further ahead would pin many buffers and perform
33 * speculative work for no benefit.
34 *
35 * C) I/O is necessary, it appears to be random, and this system supports
36 * read-ahead advice. We'll look further ahead in order to reach the
37 * configured level of I/O concurrency.
38 *
39 * The distance increases rapidly and decays slowly, so that it moves towards
40 * those levels as different I/O patterns are discovered. For example, a
41 * sequential scan of fully cached data doesn't bother looking ahead, but a
42 * sequential scan that hits a region of uncached blocks will start issuing
43 * increasingly wide read calls until it plateaus at io_combine_limit.
44 *
45 * The main data structure is a circular queue of buffers of size
46 * max_pinned_buffers plus some extra space for technical reasons, ready to be
47 * returned by read_stream_next_buffer(). Each buffer also has an optional
48 * variable sized object that is passed from the callback to the consumer of
49 * buffers.
50 *
51 * Parallel to the queue of buffers, there is a circular queue of in-progress
52 * I/Os that have been started with StartReadBuffers(), and for which
53 * WaitReadBuffers() must be called before returning the buffer.
54 *
55 * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
56 * successive calls, then these data structures might appear as follows:
57 *
58 * buffers buf/data ios
59 *
60 * +----+ +-----+ +--------+
61 * | | | | +----+ 42..44 | <- oldest_io_index
62 * +----+ +-----+ | +--------+
63 * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
64 * +----+ +-----+ | | +--------+
65 * | 42 | | ? |<-+ | | | <- next_io_index
66 * +----+ +-----+ | +--------+
67 * | 43 | | ? | | | |
68 * +----+ +-----+ | +--------+
69 * | 44 | | ? | | | |
70 * +----+ +-----+ | +--------+
71 * | 60 | | ? |<---+
72 * +----+ +-----+
73 * next_buffer_index -> | | | |
74 * +----+ +-----+
75 *
76 * In the example, 5 buffers are pinned, and the next buffer to be streamed to
77 * the client is block 10. Block 10 was a hit and has no associated I/O, but
78 * the range 42..44 requires an I/O wait before its buffers are returned, as
79 * does block 60.
80 *
81 *
82 * Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
83 * Portions Copyright (c) 1994, Regents of the University of California
84 *
85 * IDENTIFICATION
86 * src/backend/storage/aio/read_stream.c
87 *
88 *-------------------------------------------------------------------------
89 */
90#include "postgres.h"
91
92#include "miscadmin.h"
93#include "storage/fd.h"
94#include "storage/smgr.h"
95#include "storage/read_stream.h"
96#include "utils/memdebug.h"
97#include "utils/rel.h"
98#include "utils/spccache.h"
99
100typedef struct InProgressIO
101{
105
106/*
107 * State for managing a stream of reads.
108 */
110{
118
119 /*
120 * One-block buffer to support 'ungetting' a block number, to resolve flow
121 * control problems when I/Os are split.
122 */
124
125 /*
126 * The callback that will tell us which block numbers to read, and an
127 * opaque pointer that will be pass to it for its own purposes.
128 */
131
132 /* Next expected block, for detecting sequential access. */
134
135 /* The read operation we are currently preparing. */
138
139 /* Space for buffers and optional per-buffer private data. */
142
143 /* Read operations that have been started but not waited for yet. */
147
149
150 /* Circular queue of buffers. */
151 int16 oldest_buffer_index; /* Next pinned buffer to return */
152 int16 next_buffer_index; /* Index of next buffer to pin */
154};
155
156/*
157 * Return a pointer to the per-buffer data by index.
158 */
159static inline void *
161{
162 return (char *) stream->per_buffer_data +
163 stream->per_buffer_data_size * buffer_index;
164}
165
166/*
167 * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
168 * blocks [current_blocknum, last_exclusive).
169 */
172 void *callback_private_data,
173 void *per_buffer_data)
174{
175 BlockRangeReadStreamPrivate *p = callback_private_data;
176
178 return p->current_blocknum++;
179
180 return InvalidBlockNumber;
181}
182
183/*
184 * Ask the callback which block it would like us to read next, with a one block
185 * buffer in front to allow read_stream_unget_block() to work.
186 */
187static inline BlockNumber
188read_stream_get_block(ReadStream *stream, void *per_buffer_data)
189{
190 BlockNumber blocknum;
191
192 blocknum = stream->buffered_blocknum;
193 if (blocknum != InvalidBlockNumber)
195 else
196 blocknum = stream->callback(stream,
197 stream->callback_private_data,
198 per_buffer_data);
199
200 return blocknum;
201}
202
203/*
204 * In order to deal with short reads in StartReadBuffers(), we sometimes need
205 * to defer handling of a block until later.
206 */
207static inline void
209{
210 /* We shouldn't ever unget more than one block. */
212 Assert(blocknum != InvalidBlockNumber);
213 stream->buffered_blocknum = blocknum;
214}
215
216static void
217read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
218{
219 bool need_wait;
220 int nblocks;
221 int flags;
222 int16 io_index;
223 int16 overflow;
224 int16 buffer_index;
225
226 /* This should only be called with a pending read. */
227 Assert(stream->pending_read_nblocks > 0);
229
230 /* We had better not exceed the pin limit by starting this read. */
231 Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
232 stream->max_pinned_buffers);
233
234 /* We had better not be overwriting an existing pinned buffer. */
235 if (stream->pinned_buffers > 0)
236 Assert(stream->next_buffer_index != stream->oldest_buffer_index);
237 else
238 Assert(stream->next_buffer_index == stream->oldest_buffer_index);
239
240 /*
241 * If advice hasn't been suppressed, this system supports it, and this
242 * isn't a strictly sequential pattern, then we'll issue advice.
243 */
244 if (!suppress_advice &&
245 stream->advice_enabled &&
246 stream->pending_read_blocknum != stream->seq_blocknum)
248 else
249 flags = 0;
250
251 /* We say how many blocks we want to read, but may be smaller on return. */
252 buffer_index = stream->next_buffer_index;
253 io_index = stream->next_io_index;
254 nblocks = stream->pending_read_nblocks;
255 need_wait = StartReadBuffers(&stream->ios[io_index].op,
256 &stream->buffers[buffer_index],
257 stream->pending_read_blocknum,
258 &nblocks,
259 flags);
260 stream->pinned_buffers += nblocks;
261
262 /* Remember whether we need to wait before returning this buffer. */
263 if (!need_wait)
264 {
265 /* Look-ahead distance decays, no I/O necessary (behavior A). */
266 if (stream->distance > 1)
267 stream->distance--;
268 }
269 else
270 {
271 /*
272 * Remember to call WaitReadBuffers() before returning head buffer.
273 * Look-ahead distance will be adjusted after waiting.
274 */
275 stream->ios[io_index].buffer_index = buffer_index;
276 if (++stream->next_io_index == stream->max_ios)
277 stream->next_io_index = 0;
278 Assert(stream->ios_in_progress < stream->max_ios);
279 stream->ios_in_progress++;
280 stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
281 }
282
283 /*
284 * We gave a contiguous range of buffer space to StartReadBuffers(), but
285 * we want it to wrap around at queue_size. Slide overflowing buffers to
286 * the front of the array.
287 */
288 overflow = (buffer_index + nblocks) - stream->queue_size;
289 if (overflow > 0)
290 memmove(&stream->buffers[0],
291 &stream->buffers[stream->queue_size],
292 sizeof(stream->buffers[0]) * overflow);
293
294 /* Compute location of start of next read, without using % operator. */
295 buffer_index += nblocks;
296 if (buffer_index >= stream->queue_size)
297 buffer_index -= stream->queue_size;
298 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
299 stream->next_buffer_index = buffer_index;
300
301 /* Adjust the pending read to cover the remaining portion, if any. */
302 stream->pending_read_blocknum += nblocks;
303 stream->pending_read_nblocks -= nblocks;
304}
305
306static void
307read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
308{
309 while (stream->ios_in_progress < stream->max_ios &&
310 stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
311 {
312 BlockNumber blocknum;
313 int16 buffer_index;
314 void *per_buffer_data;
315
317 {
318 read_stream_start_pending_read(stream, suppress_advice);
319 suppress_advice = false;
320 continue;
321 }
322
323 /*
324 * See which block the callback wants next in the stream. We need to
325 * compute the index of the Nth block of the pending read including
326 * wrap-around, but we don't want to use the expensive % operator.
327 */
328 buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
329 if (buffer_index >= stream->queue_size)
330 buffer_index -= stream->queue_size;
331 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
332 per_buffer_data = get_per_buffer_data(stream, buffer_index);
333 blocknum = read_stream_get_block(stream, per_buffer_data);
334 if (blocknum == InvalidBlockNumber)
335 {
336 /* End of stream. */
337 stream->distance = 0;
338 break;
339 }
340
341 /* Can we merge it with the pending read? */
342 if (stream->pending_read_nblocks > 0 &&
343 stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
344 {
345 stream->pending_read_nblocks++;
346 continue;
347 }
348
349 /* We have to start the pending read before we can build another. */
350 while (stream->pending_read_nblocks > 0)
351 {
352 read_stream_start_pending_read(stream, suppress_advice);
353 suppress_advice = false;
354 if (stream->ios_in_progress == stream->max_ios)
355 {
356 /* And we've hit the limit. Rewind, and stop here. */
357 read_stream_unget_block(stream, blocknum);
358 return;
359 }
360 }
361
362 /* This is the start of a new pending read. */
363 stream->pending_read_blocknum = blocknum;
364 stream->pending_read_nblocks = 1;
365 }
366
367 /*
368 * We don't start the pending read just because we've hit the distance
369 * limit, preferring to give it another chance to grow to full
370 * io_combine_limit size once more buffers have been consumed. However,
371 * if we've already reached io_combine_limit, or we've reached the
372 * distance limit and there isn't anything pinned yet, or the callback has
373 * signaled end-of-stream, we start the read immediately.
374 */
375 if (stream->pending_read_nblocks > 0 &&
377 (stream->pending_read_nblocks == stream->distance &&
378 stream->pinned_buffers == 0) ||
379 stream->distance == 0) &&
380 stream->ios_in_progress < stream->max_ios)
381 read_stream_start_pending_read(stream, suppress_advice);
382}
383
384/*
385 * Create a new read stream object that can be used to perform the equivalent
386 * of a series of ReadBuffer() calls for one fork of one relation.
387 * Internally, it generates larger vectored reads where possible by looking
388 * ahead. The callback should return block numbers or InvalidBlockNumber to
389 * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
390 * write extra data for each block into the space provided to it. It will
391 * also receive callback_private_data for its own purposes.
392 */
393static ReadStream *
395 BufferAccessStrategy strategy,
396 Relation rel,
397 SMgrRelation smgr,
398 char persistence,
399 ForkNumber forknum,
401 void *callback_private_data,
402 size_t per_buffer_data_size)
403{
404 ReadStream *stream;
405 size_t size;
406 int16 queue_size;
407 int max_ios;
408 int strategy_pin_limit;
409 uint32 max_pinned_buffers;
410 Oid tablespace_id;
411
412 /*
413 * Decide how many I/Os we will allow to run at the same time. That
414 * currently means advice to the kernel to tell it that we will soon read.
415 * This number also affects how far we look ahead for opportunities to
416 * start more I/Os.
417 */
418 tablespace_id = smgr->smgr_rlocator.locator.spcOid;
419 if (!OidIsValid(MyDatabaseId) ||
420 (rel && IsCatalogRelation(rel)) ||
422 {
423 /*
424 * Avoid circularity while trying to look up tablespace settings or
425 * before spccache.c is ready.
426 */
427 max_ios = effective_io_concurrency;
428 }
429 else if (flags & READ_STREAM_MAINTENANCE)
430 max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
431 else
432 max_ios = get_tablespace_io_concurrency(tablespace_id);
433
434 /* Cap to INT16_MAX to avoid overflowing below */
435 max_ios = Min(max_ios, PG_INT16_MAX);
436
437 /*
438 * Choose the maximum number of buffers we're prepared to pin. We try to
439 * pin fewer if we can, though. We clamp it to at least io_combine_limit
440 * so that we can have a chance to build up a full io_combine_limit sized
441 * read, even when max_ios is zero. Be careful not to allow int16 to
442 * overflow (even though that's not possible with the current GUC range
443 * limits), allowing also for the spare entry and the overflow space.
444 */
445 max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
446 max_pinned_buffers = Min(max_pinned_buffers,
448
449 /* Give the strategy a chance to limit the number of buffers we pin. */
450 strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
451 max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
452
453 /* Don't allow this backend to pin more than its share of buffers. */
454 if (SmgrIsTemp(smgr))
455 LimitAdditionalLocalPins(&max_pinned_buffers);
456 else
457 LimitAdditionalPins(&max_pinned_buffers);
458 Assert(max_pinned_buffers > 0);
459
460 /*
461 * We need one extra entry for buffers and per-buffer data, because users
462 * of per-buffer data have access to the object until the next call to
463 * read_stream_next_buffer(), so we need a gap between the head and tail
464 * of the queue so that we don't clobber it.
465 */
466 queue_size = max_pinned_buffers + 1;
467
468 /*
469 * Allocate the object, the buffers, the ios and per_buffer_data space in
470 * one big chunk. Though we have queue_size buffers, we want to be able
471 * to assume that all the buffers for a single read are contiguous (i.e.
472 * don't wrap around halfway through), so we allow temporary overflows of
473 * up to the maximum possible read size by allocating an extra
474 * io_combine_limit - 1 elements.
475 */
476 size = offsetof(ReadStream, buffers);
477 size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
478 size += sizeof(InProgressIO) * Max(1, max_ios);
479 size += per_buffer_data_size * queue_size;
480 size += MAXIMUM_ALIGNOF * 2;
481 stream = (ReadStream *) palloc(size);
482 memset(stream, 0, offsetof(ReadStream, buffers));
483 stream->ios = (InProgressIO *)
484 MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
485 if (per_buffer_data_size > 0)
486 stream->per_buffer_data = (void *)
487 MAXALIGN(&stream->ios[Max(1, max_ios)]);
488
489#ifdef USE_PREFETCH
490
491 /*
492 * This system supports prefetching advice. We can use it as long as
493 * direct I/O isn't enabled, the caller hasn't promised sequential access
494 * (overriding our detection heuristics), and max_ios hasn't been set to
495 * zero.
496 */
497 if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
498 (flags & READ_STREAM_SEQUENTIAL) == 0 &&
499 max_ios > 0)
500 stream->advice_enabled = true;
501#endif
502
503 /*
504 * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
505 * above. If we had real asynchronous I/O we might need a slightly
506 * different definition.
507 */
508 if (max_ios == 0)
509 max_ios = 1;
510
511 stream->max_ios = max_ios;
512 stream->per_buffer_data_size = per_buffer_data_size;
513 stream->max_pinned_buffers = max_pinned_buffers;
514 stream->queue_size = queue_size;
515 stream->callback = callback;
516 stream->callback_private_data = callback_private_data;
518
519 /*
520 * Skip the initial ramp-up phase if the caller says we're going to be
521 * reading the whole relation. This way we start out assuming we'll be
522 * doing full io_combine_limit sized reads (behavior B).
523 */
524 if (flags & READ_STREAM_FULL)
525 stream->distance = Min(max_pinned_buffers, io_combine_limit);
526 else
527 stream->distance = 1;
528
529 /*
530 * Since we always access the same relation, we can initialize parts of
531 * the ReadBuffersOperation objects and leave them that way, to avoid
532 * wasting CPU cycles writing to them for each read.
533 */
534 for (int i = 0; i < max_ios; ++i)
535 {
536 stream->ios[i].op.rel = rel;
537 stream->ios[i].op.smgr = smgr;
538 stream->ios[i].op.persistence = persistence;
539 stream->ios[i].op.forknum = forknum;
540 stream->ios[i].op.strategy = strategy;
541 }
542
543 return stream;
544}
545
546/*
547 * Create a new read stream for reading a relation.
548 * See read_stream_begin_impl() for the detailed explanation.
549 */
552 BufferAccessStrategy strategy,
553 Relation rel,
554 ForkNumber forknum,
556 void *callback_private_data,
557 size_t per_buffer_data_size)
558{
559 return read_stream_begin_impl(flags,
560 strategy,
561 rel,
562 RelationGetSmgr(rel),
563 rel->rd_rel->relpersistence,
564 forknum,
565 callback,
566 callback_private_data,
567 per_buffer_data_size);
568}
569
570/*
571 * Create a new read stream for reading a SMgr relation.
572 * See read_stream_begin_impl() for the detailed explanation.
573 */
576 BufferAccessStrategy strategy,
577 SMgrRelation smgr,
578 char smgr_persistence,
579 ForkNumber forknum,
581 void *callback_private_data,
582 size_t per_buffer_data_size)
583{
584 return read_stream_begin_impl(flags,
585 strategy,
586 NULL,
587 smgr,
588 smgr_persistence,
589 forknum,
590 callback,
591 callback_private_data,
592 per_buffer_data_size);
593}
594
595/*
596 * Pull one pinned buffer out of a stream. Each call returns successive
597 * blocks in the order specified by the callback. If per_buffer_data_size was
598 * set to a non-zero size, *per_buffer_data receives a pointer to the extra
599 * per-buffer data that the callback had a chance to populate, which remains
600 * valid until the next call to read_stream_next_buffer(). When the stream
601 * runs out of data, InvalidBuffer is returned. The caller may decide to end
602 * the stream early at any time by calling read_stream_end().
603 */
604Buffer
605read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
606{
607 Buffer buffer;
608 int16 oldest_buffer_index;
609
610#ifndef READ_STREAM_DISABLE_FAST_PATH
611
612 /*
613 * A fast path for all-cached scans (behavior A). This is the same as the
614 * usual algorithm, but it is specialized for no I/O and no per-buffer
615 * data, so we can skip the queue management code, stay in the same buffer
616 * slot and use singular StartReadBuffer().
617 */
618 if (likely(stream->fast_path))
619 {
620 BlockNumber next_blocknum;
621
622 /* Fast path assumptions. */
623 Assert(stream->ios_in_progress == 0);
624 Assert(stream->pinned_buffers == 1);
625 Assert(stream->distance == 1);
626 Assert(stream->pending_read_nblocks == 0);
627 Assert(stream->per_buffer_data_size == 0);
628
629 /* We're going to return the buffer we pinned last time. */
630 oldest_buffer_index = stream->oldest_buffer_index;
631 Assert((oldest_buffer_index + 1) % stream->queue_size ==
632 stream->next_buffer_index);
633 buffer = stream->buffers[oldest_buffer_index];
634 Assert(buffer != InvalidBuffer);
635
636 /* Choose the next block to pin. */
637 next_blocknum = read_stream_get_block(stream, NULL);
638
639 if (likely(next_blocknum != InvalidBlockNumber))
640 {
641 /*
642 * Pin a buffer for the next call. Same buffer entry, and
643 * arbitrary I/O entry (they're all free). We don't have to
644 * adjust pinned_buffers because we're transferring one to caller
645 * but pinning one more.
646 */
647 if (likely(!StartReadBuffer(&stream->ios[0].op,
648 &stream->buffers[oldest_buffer_index],
649 next_blocknum,
650 stream->advice_enabled ?
652 {
653 /* Fast return. */
654 return buffer;
655 }
656
657 /* Next call must wait for I/O for the newly pinned buffer. */
658 stream->oldest_io_index = 0;
659 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
660 stream->ios_in_progress = 1;
661 stream->ios[0].buffer_index = oldest_buffer_index;
662 stream->seq_blocknum = next_blocknum + 1;
663 }
664 else
665 {
666 /* No more blocks, end of stream. */
667 stream->distance = 0;
668 stream->oldest_buffer_index = stream->next_buffer_index;
669 stream->pinned_buffers = 0;
670 }
671
672 stream->fast_path = false;
673 return buffer;
674 }
675#endif
676
677 if (unlikely(stream->pinned_buffers == 0))
678 {
679 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
680
681 /* End of stream reached? */
682 if (stream->distance == 0)
683 return InvalidBuffer;
684
685 /*
686 * The usual order of operations is that we look ahead at the bottom
687 * of this function after potentially finishing an I/O and making
688 * space for more, but if we're just starting up we'll need to crank
689 * the handle to get started.
690 */
691 read_stream_look_ahead(stream, true);
692
693 /* End of stream reached? */
694 if (stream->pinned_buffers == 0)
695 {
696 Assert(stream->distance == 0);
697 return InvalidBuffer;
698 }
699 }
700
701 /* Grab the oldest pinned buffer and associated per-buffer data. */
702 Assert(stream->pinned_buffers > 0);
703 oldest_buffer_index = stream->oldest_buffer_index;
704 Assert(oldest_buffer_index >= 0 &&
705 oldest_buffer_index < stream->queue_size);
706 buffer = stream->buffers[oldest_buffer_index];
707 if (per_buffer_data)
708 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
709
710 Assert(BufferIsValid(buffer));
711
712 /* Do we have to wait for an associated I/O first? */
713 if (stream->ios_in_progress > 0 &&
714 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
715 {
716 int16 io_index = stream->oldest_io_index;
717 int16 distance;
718
719 /* Sanity check that we still agree on the buffers. */
720 Assert(stream->ios[io_index].op.buffers ==
721 &stream->buffers[oldest_buffer_index]);
722
723 WaitReadBuffers(&stream->ios[io_index].op);
724
725 Assert(stream->ios_in_progress > 0);
726 stream->ios_in_progress--;
727 if (++stream->oldest_io_index == stream->max_ios)
728 stream->oldest_io_index = 0;
729
730 if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
731 {
732 /* Distance ramps up fast (behavior C). */
733 distance = stream->distance * 2;
734 distance = Min(distance, stream->max_pinned_buffers);
735 stream->distance = distance;
736 }
737 else
738 {
739 /* No advice; move towards io_combine_limit (behavior B). */
740 if (stream->distance > io_combine_limit)
741 {
742 stream->distance--;
743 }
744 else
745 {
746 distance = stream->distance * 2;
747 distance = Min(distance, io_combine_limit);
748 distance = Min(distance, stream->max_pinned_buffers);
749 stream->distance = distance;
750 }
751 }
752 }
753
754#ifdef CLOBBER_FREED_MEMORY
755 /* Clobber old buffer and per-buffer data for debugging purposes. */
756 stream->buffers[oldest_buffer_index] = InvalidBuffer;
757
758 /*
759 * The caller will get access to the per-buffer data, until the next call.
760 * We wipe the one before, which is never occupied because queue_size
761 * allowed one extra element. This will hopefully trip up client code
762 * that is holding a dangling pointer to it.
763 */
764 if (stream->per_buffer_data)
765 wipe_mem(get_per_buffer_data(stream,
766 oldest_buffer_index == 0 ?
767 stream->queue_size - 1 :
768 oldest_buffer_index - 1),
769 stream->per_buffer_data_size);
770#endif
771
772 /* Pin transferred to caller. */
773 Assert(stream->pinned_buffers > 0);
774 stream->pinned_buffers--;
775
776 /* Advance oldest buffer, with wrap-around. */
777 stream->oldest_buffer_index++;
778 if (stream->oldest_buffer_index == stream->queue_size)
779 stream->oldest_buffer_index = 0;
780
781 /* Prepare for the next call. */
782 read_stream_look_ahead(stream, false);
783
784#ifndef READ_STREAM_DISABLE_FAST_PATH
785 /* See if we can take the fast path for all-cached scans next time. */
786 if (stream->ios_in_progress == 0 &&
787 stream->pinned_buffers == 1 &&
788 stream->distance == 1 &&
789 stream->pending_read_nblocks == 0 &&
790 stream->per_buffer_data_size == 0)
791 {
792 stream->fast_path = true;
793 }
794#endif
795
796 return buffer;
797}
798
799/*
800 * Transitional support for code that would like to perform or skip reads
801 * itself, without using the stream. Returns, and consumes, the next block
802 * number that would be read by the stream's look-ahead algorithm, or
803 * InvalidBlockNumber if the end of the stream is reached. Also reports the
804 * strategy that would be used to read it.
805 */
808{
809 *strategy = stream->ios[0].op.strategy;
810 return read_stream_get_block(stream, NULL);
811}
812
813/*
814 * Reset a read stream by releasing any queued up buffers, allowing the stream
815 * to be used again for different blocks. This can be used to clear an
816 * end-of-stream condition and start again, or to throw away blocks that were
817 * speculatively read and read some different blocks instead.
818 */
819void
821{
822 Buffer buffer;
823
824 /* Stop looking ahead. */
825 stream->distance = 0;
826
827 /* Forget buffered block number and fast path state. */
829 stream->fast_path = false;
830
831 /* Unpin anything that wasn't consumed. */
832 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
833 ReleaseBuffer(buffer);
834
835 Assert(stream->pinned_buffers == 0);
836 Assert(stream->ios_in_progress == 0);
837
838 /* Start off assuming data is cached. */
839 stream->distance = 1;
840}
841
842/*
843 * Release and free a read stream.
844 */
845void
847{
848 read_stream_reset(stream);
849 pfree(stream);
850}
uint32 BlockNumber
Definition: block.h:31
#define InvalidBlockNumber
Definition: block.h:33
int Buffer
Definition: buf.h:23
#define InvalidBuffer
Definition: buf.h:25
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition: bufmgr.c:1367
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4866
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1410
void LimitAdditionalPins(uint32 *additional_pins)
Definition: bufmgr.c:2116
int effective_io_concurrency
Definition: bufmgr.c:151
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1382
int io_combine_limit
Definition: bufmgr.c:165
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:113
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:347
#define Min(x, y)
Definition: c.h:961
#define likely(x)
Definition: c.h:332
#define MAXALIGN(LEN)
Definition: c.h:768
#define Max(x, y)
Definition: c.h:955
#define Assert(condition)
Definition: c.h:815
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:420
int16_t int16
Definition: c.h:483
#define unlikely(x)
Definition: c.h:333
uint32_t uint32
Definition: c.h:488
#define PG_INT16_MAX
Definition: c.h:543
#define OidIsValid(objectId)
Definition: c.h:732
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:120
int io_direct_flags
Definition: fd.c:167
#define IO_DIRECT_DATA
Definition: fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition: freelist.c:647
Oid MyDatabaseId
Definition: globals.c:93
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
void LimitAdditionalLocalPins(uint32 *additional_pins)
Definition: localbuf.c:291
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
unsigned int Oid
Definition: postgres_ext.h:32
ReadStream * read_stream_begin_smgr_relation(int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:575
static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:307
BlockNumber read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
Definition: read_stream.c:807
void read_stream_reset(ReadStream *stream)
Definition: read_stream.c:820
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:605
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:160
ReadStream * read_stream_begin_relation(int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:551
struct InProgressIO InProgressIO
void read_stream_end(ReadStream *stream)
Definition: read_stream.c:846
BlockNumber block_range_read_stream_cb(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
Definition: read_stream.c:171
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:188
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
Definition: read_stream.c:208
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:394
static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
Definition: read_stream.c:217
#define READ_STREAM_MAINTENANCE
Definition: read_stream.h:28
BlockNumber(* ReadStreamBlockNumberCB)(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
Definition: read_stream.h:56
#define READ_STREAM_FULL
Definition: read_stream.h:43
#define READ_STREAM_SEQUENTIAL
Definition: read_stream.h:36
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:574
ForkNumber
Definition: relpath.h:56
static pg_noinline void Size size
Definition: slab.c:607
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
int get_tablespace_io_concurrency(Oid spcid)
Definition: spccache.c:215
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition: spccache.c:229
int16 buffer_index
Definition: read_stream.c:102
ReadBuffersOperation op
Definition: read_stream.c:103
ForkNumber forknum
Definition: bufmgr.h:121
Buffer * buffers
Definition: bufmgr.h:129
BufferAccessStrategy strategy
Definition: bufmgr.h:122
struct SMgrRelationData * smgr
Definition: bufmgr.h:119
int16 distance
Definition: read_stream.c:116
int16 ios_in_progress
Definition: read_stream.c:112
void * per_buffer_data
Definition: read_stream.c:141
int16 pinned_buffers
Definition: read_stream.c:115
int16 max_ios
Definition: read_stream.c:111
int16 oldest_buffer_index
Definition: read_stream.c:151
BlockNumber seq_blocknum
Definition: read_stream.c:133
bool advice_enabled
Definition: read_stream.c:117
BlockNumber pending_read_blocknum
Definition: read_stream.c:136
int16 max_pinned_buffers
Definition: read_stream.c:114
InProgressIO * ios
Definition: read_stream.c:144
int16 oldest_io_index
Definition: read_stream.c:145
BlockNumber buffered_blocknum
Definition: read_stream.c:123
int16 queue_size
Definition: read_stream.c:113
int16 next_buffer_index
Definition: read_stream.c:152
size_t per_buffer_data_size
Definition: read_stream.c:140
ReadStreamBlockNumberCB callback
Definition: read_stream.c:129
int16 next_io_index
Definition: read_stream.c:146
bool fast_path
Definition: read_stream.c:148
int16 pending_read_nblocks
Definition: read_stream.c:137
void * callback_private_data
Definition: read_stream.c:130
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:153
RelFileLocator locator
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46