PostgreSQL Source Code git master
Loading...
Searching...
No Matches
read_stream.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * read_stream.c
4 * Mechanism for accessing buffered relation data with look-ahead
5 *
6 * Code that needs to access relation data typically pins blocks one at a
7 * time, often in a predictable order that might be sequential or data-driven.
8 * Calling the simple ReadBuffer() function for each block is inefficient,
9 * because blocks that are not yet in the buffer pool require I/O operations
10 * that are small and might stall waiting for storage. This mechanism looks
11 * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 * distance.
14 *
15 * A user-provided callback generates a stream of block numbers that is used
16 * to form reads of up to io_combine_limit, by attempting to merge them with a
17 * pending read. When that isn't possible, the existing pending read is sent
18 * to StartReadBuffers() so that a new one can begin to form.
19 *
20 * The algorithm for controlling the look-ahead distance is based on recent
21 * cache / miss history, as well as whether we need to wait for I/O completion
22 * after a miss. When no I/O is necessary, there is no benefit in looking
23 * ahead more than one block. This is the default initial assumption. When
24 * blocks needing I/O are streamed, the combine distance is increased to
25 * benefit from I/O combining and the read-ahead distance is increased
26 * whenever we need to wait for I/O to try to benefit from increased I/O
27 * concurrency. Both are reduced gradually when cached blocks are streamed.
28 *
29 * The main data structure is a circular queue of buffers of size
30 * max_pinned_buffers plus some extra space for technical reasons, ready to be
31 * returned by read_stream_next_buffer(). Each buffer also has an optional
32 * variable sized object that is passed from the callback to the consumer of
33 * buffers.
34 *
35 * Parallel to the queue of buffers, there is a circular queue of in-progress
36 * I/Os that have been started with StartReadBuffers(), and for which
37 * WaitReadBuffers() must be called before returning the buffer.
38 *
39 * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
40 * successive calls, then these data structures might appear as follows:
41 *
42 * buffers buf/data ios
43 *
44 * +----+ +-----+ +--------+
45 * | | | | +----+ 42..44 | <- oldest_io_index
46 * +----+ +-----+ | +--------+
47 * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
48 * +----+ +-----+ | | +--------+
49 * | 42 | | ? |<-+ | | | <- next_io_index
50 * +----+ +-----+ | +--------+
51 * | 43 | | ? | | | |
52 * +----+ +-----+ | +--------+
53 * | 44 | | ? | | | |
54 * +----+ +-----+ | +--------+
55 * | 60 | | ? |<---+
56 * +----+ +-----+
57 * next_buffer_index -> | | | |
58 * +----+ +-----+
59 *
60 * In the example, 5 buffers are pinned, and the next buffer to be streamed to
61 * the client is block 10. Block 10 was a hit and has no associated I/O, but
62 * the range 42..44 requires an I/O wait before its buffers are returned, as
63 * does block 60.
64 *
65 *
66 * Portions Copyright (c) 2024-2026, PostgreSQL Global Development Group
67 * Portions Copyright (c) 1994, Regents of the University of California
68 *
69 * IDENTIFICATION
70 * src/backend/storage/aio/read_stream.c
71 *
72 *-------------------------------------------------------------------------
73 */
74#include "postgres.h"
75
76#include "miscadmin.h"
78#include "storage/aio.h"
79#include "storage/fd.h"
80#include "storage/smgr.h"
81#include "storage/read_stream.h"
82#include "utils/memdebug.h"
83#include "utils/rel.h"
84#include "utils/spccache.h"
85
91
92/*
93 * State for managing a stream of reads.
94 */
96{
104
105 /*
106 * Limit of how far, in blocks, to look-ahead for IO combining and for
107 * read-ahead.
108 *
109 * The limits for read-ahead and combining are handled separately to allow
110 * for IO combining even in cases where the I/O subsystem can keep up at a
111 * low read-ahead distance, as doing larger IOs is more efficient.
112 *
113 * Set to 0 when the end of the stream is reached.
114 */
122 bool sync_mode; /* using io_method=sync */
123 bool batch_mode; /* READ_STREAM_USE_BATCHING */
126
127 /* scan stats counters */
129
130 /*
131 * One-block buffer to support 'ungetting' a block number, to resolve flow
132 * control problems when I/Os are split.
133 */
135
136 /*
137 * The callback that will tell us which block numbers to read, and an
138 * opaque pointer that will be pass to it for its own purposes.
139 */
142
143 /* Next expected block, for detecting sequential access. */
146
147 /* The read operation we are currently preparing. */
150
151 /* Space for buffers and optional per-buffer private data. */
154
155 /* Read operations that have been started but not waited for yet. */
159
161
162 /* Circular queue of buffers. */
163 int16 oldest_buffer_index; /* Next pinned buffer to return */
164 int16 next_buffer_index; /* Index of next buffer to pin */
166};
167
168/*
169 * Return a pointer to the per-buffer data by index.
170 */
171static inline void *
173{
174 return (char *) stream->per_buffer_data +
175 stream->per_buffer_data_size * buffer_index;
176}
177
178/*
179 * General-use ReadStreamBlockNumberCB for block range scans. Loops over the
180 * blocks [current_blocknum, last_exclusive).
181 */
184 void *callback_private_data,
185 void *per_buffer_data)
186{
187 BlockRangeReadStreamPrivate *p = callback_private_data;
188
190 return p->current_blocknum++;
191
192 return InvalidBlockNumber;
193}
194
195/*
196 * Update stream stats with current pinned buffer depth.
197 *
198 * Called once per buffer returned to the consumer in read_stream_next_buffer().
199 * Records the number of pinned buffers at that moment, so we can compute the
200 * average look-ahead depth.
201 */
202static inline void
204{
205 IOStats *stats = stream->stats;
206
207 if (stats == NULL)
208 return;
209
210 stats->prefetch_count++;
211 stats->distance_sum += stream->pinned_buffers;
212 if (stream->pinned_buffers > stats->distance_max)
213 stats->distance_max = stream->pinned_buffers;
214}
215
216/*
217 * Update stream stats about size of I/O requests.
218 *
219 * We count the number of I/O requests, size of requests (counted in blocks)
220 * and number of in-progress I/Os.
221 */
222static inline void
224{
225 IOStats *stats = stream->stats;
226
227 if (stats == NULL)
228 return;
229
230 stats->io_count++;
231 stats->io_nblocks += nblocks;
232 stats->io_in_progress += in_progress;
233}
234
235/*
236 * Update stream stats about waits for I/O when consuming buffers.
237 *
238 * We count the number of I/O waits while pulling buffers out of a stream.
239 */
240static inline void
242{
243 IOStats *stats = stream->stats;
244
245 if (stats == NULL)
246 return;
247
248 stats->wait_count++;
249}
250
251/*
252 * Enable collection of stats into the provided IOStats.
253 */
254void
256{
257 stream->stats = stats;
258 if (stream->stats)
259 stream->stats->distance_capacity = stream->max_pinned_buffers;
260}
261
262/*
263 * Ask the callback which block it would like us to read next, with a one block
264 * buffer in front to allow read_stream_unget_block() to work.
265 */
266static inline BlockNumber
267read_stream_get_block(ReadStream *stream, void *per_buffer_data)
268{
269 BlockNumber blocknum;
270
271 blocknum = stream->buffered_blocknum;
272 if (blocknum != InvalidBlockNumber)
274 else
275 {
276 /*
277 * Tell Valgrind that the per-buffer data is undefined. That replaces
278 * the "noaccess" state that was set when the consumer moved past this
279 * entry last time around the queue, and should also catch callbacks
280 * that fail to initialize data that the buffer consumer later
281 * accesses. On the first go around, it is undefined already.
282 */
283 VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
284 stream->per_buffer_data_size);
285 blocknum = stream->callback(stream,
286 stream->callback_private_data,
287 per_buffer_data);
288 }
289
290 return blocknum;
291}
292
293/*
294 * In order to deal with buffer shortages and I/O limits after short reads, we
295 * sometimes need to defer handling of a block we've already consumed from the
296 * registered callback until later.
297 */
298static inline void
300{
301 /* We shouldn't ever unget more than one block. */
303 Assert(blocknum != InvalidBlockNumber);
304 stream->buffered_blocknum = blocknum;
305}
306
307/*
308 * Start as much of the current pending read as we can. If we have to split it
309 * because of the per-backend buffer limit, or the buffer manager decides to
310 * split it, then the pending read is adjusted to hold the remaining portion.
311 *
312 * We can always start a read of at least size one if we have no progress yet.
313 * Otherwise it's possible that we can't start a read at all because of a lack
314 * of buffers, and then false is returned. Buffer shortages also reduce the
315 * distance to a level that prevents look-ahead until buffers are released.
316 */
317static bool
319{
320 bool need_wait;
322 int nblocks;
323 int flags;
324 int forwarded;
326 int16 overflow;
327 int16 buffer_index;
328 int buffer_limit;
329
330 /* This should only be called with a pending read. */
331 Assert(stream->pending_read_nblocks > 0);
332 Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
333
334 /* We had better not exceed the per-stream buffer limit with this read. */
335 Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
336 stream->max_pinned_buffers);
337
338#ifdef USE_ASSERT_CHECKING
339 /* We had better not be overwriting an existing pinned buffer. */
340 if (stream->pinned_buffers > 0)
341 Assert(stream->next_buffer_index != stream->oldest_buffer_index);
342 else
343 Assert(stream->next_buffer_index == stream->oldest_buffer_index);
344
345 /*
346 * Pinned buffers forwarded by a preceding StartReadBuffers() call that
347 * had to split the operation should match the leading blocks of this
348 * following StartReadBuffers() call.
349 */
351 for (int i = 0; i < stream->forwarded_buffers; ++i)
353 stream->pending_read_blocknum + i);
354
355 /*
356 * Check that we've cleared the queue/overflow entries corresponding to
357 * the rest of the blocks covered by this read, unless it's the first go
358 * around and we haven't even initialized them yet.
359 */
360 for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
361 Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
362 stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
363#endif
364
365 /* Do we need to issue read-ahead advice? */
366 flags = stream->read_buffers_flags;
367 if (stream->advice_enabled)
368 {
369 if (stream->pending_read_blocknum == stream->seq_blocknum)
370 {
371 /*
372 * Sequential: Issue advice until the preadv() calls have caught
373 * up with the first advice issued for this sequential region, and
374 * then stay out of the way of the kernel's own read-ahead.
375 */
378 }
379 else
380 {
381 /*
382 * Random jump: Note the starting location of a new potential
383 * sequential region and start issuing advice. Skip it this time
384 * if the preadv() follows immediately, eg first block in stream.
385 */
387 if (stream->pinned_buffers > 0)
389 }
390 }
391
392 /*
393 * How many more buffers is this backend allowed?
394 *
395 * Forwarded buffers are already pinned and map to the leading blocks of
396 * the pending read (the remaining portion of an earlier short read that
397 * we're about to continue). They are not counted in pinned_buffers, but
398 * they are counted as pins already held by this backend according to the
399 * buffer manager, so they must be added to the limit it grants us.
400 */
401 if (stream->temporary)
403 else
406
409
410 if (buffer_limit == 0 && stream->pinned_buffers == 0)
411 buffer_limit = 1; /* guarantee progress */
412
413 /* Does the per-backend limit affect this read? */
414 nblocks = stream->pending_read_nblocks;
415 if (buffer_limit < nblocks)
416 {
418
419 /* Shrink distance: no more look-ahead until buffers are released. */
421 if (stream->readahead_distance > new_distance)
423
424 /* Unless we have nothing to give the consumer, stop here. */
425 if (stream->pinned_buffers > 0)
426 return false;
427
428 /* A short read is required to make progress. */
429 nblocks = buffer_limit;
430 }
431
432 /*
433 * We say how many blocks we want to read, but it may be smaller on return
434 * if the buffer manager decides to shorten the read. Initialize buffers
435 * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
436 * and keep the original nblocks number so we can check for forwarded
437 * buffers as output, below.
438 */
439 buffer_index = stream->next_buffer_index;
440 io_index = stream->next_io_index;
441 while (stream->initialized_buffers < buffer_index + nblocks)
442 stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
443 requested_nblocks = nblocks;
445 &stream->buffers[buffer_index],
446 stream->pending_read_blocknum,
447 &nblocks,
448 flags);
449 stream->pinned_buffers += nblocks;
450
451 /* Remember whether we need to wait before returning this buffer. */
452 if (!need_wait)
453 {
454 /*
455 * If there currently is no IO in progress, and we have not needed to
456 * issue IO recently, decay the look-ahead distance. We detect if we
457 * had to issue IO recently by having a decay holdoff that's set to
458 * the max look-ahead distance whenever we need to do IO. This is
459 * important to ensure we eventually reach a high enough distance to
460 * perform IO asynchronously when starting out with a small look-ahead
461 * distance.
462 */
463 if (stream->ios_in_progress == 0)
464 {
465 if (stream->distance_decay_holdoff > 0)
466 stream->distance_decay_holdoff--;
467 else
468 {
469 if (stream->readahead_distance > 1)
470 stream->readahead_distance--;
471
472 /*
473 * For now we reduce the IO combine distance after
474 * sufficiently many buffer hits. There is no clear
475 * performance argument for doing so, but at the moment we
476 * need to do so to make the entrance into fast_path work
477 * correctly: We require combine_distance == 1 to enter
478 * fast-path, as without that condition we would wrongly
479 * re-enter fast-path when readahead_distance == 1 and
480 * pinned_buffers == 1, as we would not yet have prepared
481 * another IO in that situation.
482 */
483 if (stream->combine_distance > 1)
484 stream->combine_distance--;
485 }
486 }
487 }
488 else
489 {
490 /*
491 * Remember to call WaitReadBuffers() before returning head buffer.
492 * Look-ahead distance will be adjusted after waiting.
493 */
494 stream->ios[io_index].buffer_index = buffer_index;
495 if (++stream->next_io_index == stream->max_ios)
496 stream->next_io_index = 0;
497 Assert(stream->ios_in_progress < stream->max_ios);
498 stream->ios_in_progress++;
499 stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
500
501 /* update I/O stats */
502 read_stream_count_io(stream, nblocks, stream->ios_in_progress);
503 }
504
505 /*
506 * How many pins were acquired but forwarded to the next call? These need
507 * to be passed to the next StartReadBuffers() call by leaving them
508 * exactly where they are in the queue, or released if the stream ends
509 * early. We need the number for accounting purposes, since they are not
510 * counted in stream->pinned_buffers but we already hold them.
511 */
512 forwarded = 0;
513 while (nblocks + forwarded < requested_nblocks &&
514 stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
515 forwarded++;
517
518 /*
519 * We gave a contiguous range of buffer space to StartReadBuffers(), but
520 * we want it to wrap around at queue_size. Copy overflowing buffers to
521 * the front of the array where they'll be consumed, but also leave a copy
522 * in the overflow zone which the I/O operation has a pointer to (it needs
523 * a contiguous array). Both copies will be cleared when the buffers are
524 * handed to the consumer.
525 */
526 overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
527 if (overflow > 0)
528 {
529 Assert(overflow < stream->queue_size); /* can't overlap */
530 memcpy(&stream->buffers[0],
531 &stream->buffers[stream->queue_size],
532 sizeof(stream->buffers[0]) * overflow);
533 }
534
535 /* Compute location of start of next read, without using % operator. */
536 buffer_index += nblocks;
537 if (buffer_index >= stream->queue_size)
538 buffer_index -= stream->queue_size;
539 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
540 stream->next_buffer_index = buffer_index;
541
542 /* Adjust the pending read to cover the remaining portion, if any. */
543 stream->pending_read_blocknum += nblocks;
544 stream->pending_read_nblocks -= nblocks;
545
546 return true;
547}
548
549/*
550 * Should we continue to perform look ahead? Looking ahead may allow us to
551 * make the pending IO larger via IO combining or to issue more read-ahead.
552 */
553static inline bool
555{
556 /* If the callback has signaled end-of-stream, we're done */
557 if (stream->readahead_distance == 0)
558 return false;
559
560 /* never start more IOs than our cap */
561 if (stream->ios_in_progress >= stream->max_ios)
562 return false;
563
564 /*
565 * Allow looking further ahead if we are in the process of building a
566 * larger IO, the IO is not yet big enough, and we don't yet have IO in
567 * flight.
568 *
569 * We do so to allow building larger reads when readahead_distance is
570 * small (e.g. because the I/O subsystem is keeping up or
571 * effective_io_concurrency is small). That's a useful goal because larger
572 * reads are more CPU efficient than smaller reads, even if the system is
573 * not IO bound.
574 *
575 * The reason we do *not* do so when we already have a read prepared (i.e.
576 * why we check for pinned_buffers == 0) is once we are actually reading
577 * ahead, we don't need it:
578 *
579 * - We won't issue unnecessarily small reads as
580 * read_stream_should_issue_now() will return false until the IO is
581 * suitably sized. The issuance of the pending read will be delayed until
582 * enough buffers have been consumed.
583 *
584 * - If we are not reading ahead aggressively enough, future
585 * WaitReadBuffers() calls will return true, leading to readahead_distance
586 * being increased. After that more full-sized IOs can be issued.
587 *
588 * Furthermore, if we did not have the pinned_buffers == 0 condition, we
589 * might end up issuing I/O more aggressively than we need.
590 *
591 * Note that a return of true here can lead to exceeding the read-ahead
592 * limit, but we won't exceed the buffer pin limit (because pinned_buffers
593 * == 0 and combine_distance is capped by max_pinned_buffers).
594 */
595 if (stream->pending_read_nblocks > 0 &&
596 stream->pinned_buffers == 0 &&
597 stream->pending_read_nblocks < stream->combine_distance)
598 return true;
599
600 /*
601 * Don't start more read-ahead if that'd put us over the distance limit
602 * for doing read-ahead. As stream->readahead_distance is capped by
603 * max_pinned_buffers, this prevents us from looking ahead so far that it
604 * would put us over the pin limit.
605 */
606 if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
607 return false;
608
609 return true;
610}
611
612/*
613 * We don't start the pending read just because we've hit the distance limit,
614 * preferring to give it another chance to grow to full io_combine_limit size
615 * once more buffers have been consumed. But this is not desirable in all
616 * situations - see below.
617 */
618static inline bool
620{
621 int16 pending_read_nblocks = stream->pending_read_nblocks;
622
623 /* there is no pending IO that could be issued */
624 if (pending_read_nblocks == 0)
625 return false;
626
627 /* never start more IOs than our cap */
628 if (stream->ios_in_progress >= stream->max_ios)
629 return false;
630
631 /*
632 * If the callback has signaled end-of-stream, start the pending read
633 * immediately. There is no further potential for IO combining.
634 */
635 if (stream->readahead_distance == 0)
636 return true;
637
638 /*
639 * If we've already reached combine_distance, there's no chance of growing
640 * the read further.
641 */
642 if (pending_read_nblocks >= stream->combine_distance)
643 return true;
644
645 /*
646 * If we currently have no reads in flight or prepared, issue the IO once
647 * we are not looking ahead further. This ensures there's always at least
648 * one IO prepared.
649 */
650 if (stream->pinned_buffers == 0 &&
652 return true;
653
654 return false;
655}
656
657static void
659{
660 /*
661 * Allow amortizing the cost of submitting IO over multiple IOs. This
662 * requires that we don't do any operations that could lead to a deadlock
663 * with staged-but-unsubmitted IO. The callback needs to opt-in to being
664 * careful.
665 */
666 if (stream->batch_mode)
668
669 while (read_stream_should_look_ahead(stream))
670 {
671 BlockNumber blocknum;
672 int16 buffer_index;
673 void *per_buffer_data;
674
676 {
678 continue;
679 }
680
681 /*
682 * See which block the callback wants next in the stream. We need to
683 * compute the index of the Nth block of the pending read including
684 * wrap-around, but we don't want to use the expensive % operator.
685 */
686 buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
687 if (buffer_index >= stream->queue_size)
688 buffer_index -= stream->queue_size;
689 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
690 per_buffer_data = get_per_buffer_data(stream, buffer_index);
691 blocknum = read_stream_get_block(stream, per_buffer_data);
692 if (blocknum == InvalidBlockNumber)
693 {
694 /* End of stream. */
695 stream->readahead_distance = 0;
696 stream->combine_distance = 0;
697 break;
698 }
699
700 /* Can we merge it with the pending read? */
701 if (stream->pending_read_nblocks > 0 &&
702 stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
703 {
704 stream->pending_read_nblocks++;
705 continue;
706 }
707
708 /* We have to start the pending read before we can build another. */
709 while (stream->pending_read_nblocks > 0)
710 {
711 if (!read_stream_start_pending_read(stream) ||
712 stream->ios_in_progress == stream->max_ios)
713 {
714 /* We've hit the buffer or I/O limit. Rewind and stop here. */
715 read_stream_unget_block(stream, blocknum);
716 if (stream->batch_mode)
718 return;
719 }
720 }
721
722 /* This is the start of a new pending read. */
723 stream->pending_read_blocknum = blocknum;
724 stream->pending_read_nblocks = 1;
725 }
726
727 /*
728 * Check if the pending read should be issued now, or if we should give it
729 * another chance to grow to the full size.
730 *
731 * Note that the pending read can exceed the distance goal, if the latter
732 * was reduced after hitting the per-backend buffer limit.
733 */
736
737 /*
738 * There should always be something pinned when we leave this function,
739 * whether started by this call or not, unless we've hit the end of the
740 * stream. In the worst case we can always make progress one buffer at a
741 * time.
742 */
743 Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
744
745 if (stream->batch_mode)
747}
748
749/*
750 * Create a new read stream object that can be used to perform the equivalent
751 * of a series of ReadBuffer() calls for one fork of one relation.
752 * Internally, it generates larger vectored reads where possible by looking
753 * ahead. The callback should return block numbers or InvalidBlockNumber to
754 * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
755 * write extra data for each block into the space provided to it. It will
756 * also receive callback_private_data for its own purposes.
757 */
758static ReadStream *
760 BufferAccessStrategy strategy,
761 Relation rel,
762 SMgrRelation smgr,
763 char persistence,
764 ForkNumber forknum,
766 void *callback_private_data,
767 size_t per_buffer_data_size)
768{
769 ReadStream *stream;
770 size_t size;
771 int16 queue_size;
773 int max_ios;
775 uint32 max_pinned_buffers;
777 Oid tablespace_id;
778
779 /*
780 * Decide how many I/Os we will allow to run at the same time. That
781 * currently means advice to the kernel to tell it that we will soon read.
782 * This number also affects how far we look ahead for opportunities to
783 * start more I/Os.
784 */
785 tablespace_id = smgr->smgr_rlocator.locator.spcOid;
786 if (!OidIsValid(MyDatabaseId) ||
787 (rel && IsCatalogRelation(rel)) ||
789 {
790 /*
791 * Avoid circularity while trying to look up tablespace settings or
792 * before spccache.c is ready.
793 */
794 max_ios = effective_io_concurrency;
795 }
796 else if (flags & READ_STREAM_MAINTENANCE)
797 max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
798 else
799 max_ios = get_tablespace_io_concurrency(tablespace_id);
800
801 /* Cap to INT16_MAX to avoid overflowing below */
802 max_ios = Min(max_ios, PG_INT16_MAX);
803
804 /*
805 * If starting a multi-block I/O near the end of the queue, we might
806 * temporarily need extra space for overflowing buffers before they are
807 * moved to regular circular position. This is the maximum extra space we
808 * could need.
809 */
811
812 /*
813 * Choose the maximum number of buffers we're prepared to pin. We try to
814 * pin fewer if we can, though. We add one so that we can make progress
815 * even if max_ios is set to 0 (see also further down). For max_ios > 0,
816 * this also allows an extra full I/O's worth of buffers: after an I/O
817 * finishes we don't want to have to wait for its buffers to be consumed
818 * before starting a new one.
819 *
820 * Be careful not to allow int16 to overflow. That is possible with the
821 * current GUC range limits, so this is an artificial limit of ~32k
822 * buffers and we'd need to adjust the types to exceed that. We also have
823 * to allow for the spare entry and the overflow space.
824 */
825 max_pinned_buffers = (max_ios + 1) * io_combine_limit;
826 max_pinned_buffers = Min(max_pinned_buffers,
828
829 /* Give the strategy a chance to limit the number of buffers we pin. */
831 max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
832
833 /*
834 * Also limit our queue to the maximum number of pins we could ever be
835 * allowed to acquire according to the buffer manager. We may not really
836 * be able to use them all due to other pins held by this backend, but
837 * we'll check that later in read_stream_start_pending_read().
838 */
839 if (SmgrIsTemp(smgr))
841 else
843 max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
844
845 /*
846 * The limit might be zero on a system configured with too few buffers for
847 * the number of connections. We need at least one to make progress.
848 */
849 max_pinned_buffers = Max(1, max_pinned_buffers);
850
851 /*
852 * We need one extra entry for buffers and per-buffer data, because users
853 * of per-buffer data have access to the object until the next call to
854 * read_stream_next_buffer(), so we need a gap between the head and tail
855 * of the queue so that we don't clobber it.
856 */
857 queue_size = max_pinned_buffers + 1;
858
859 /*
860 * Allocate the object, the buffers, the ios and per_buffer_data space in
861 * one big chunk. Though we have queue_size buffers, we want to be able
862 * to assume that all the buffers for a single read are contiguous (i.e.
863 * don't wrap around halfway through), so we allow temporary overflows of
864 * up to the maximum possible overflow size.
865 */
866 size = offsetof(ReadStream, buffers);
867 size += sizeof(Buffer) * (queue_size + queue_overflow);
868 size += sizeof(InProgressIO) * Max(1, max_ios);
869 size += per_buffer_data_size * queue_size;
870 size += MAXIMUM_ALIGNOF * 2;
871 stream = (ReadStream *) palloc(size);
872 memset(stream, 0, offsetof(ReadStream, buffers));
873 stream->ios = (InProgressIO *)
874 MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
875 if (per_buffer_data_size > 0)
876 stream->per_buffer_data = (void *)
877 MAXALIGN(&stream->ios[Max(1, max_ios)]);
878
879 stream->sync_mode = io_method == IOMETHOD_SYNC;
880 stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
881
882#ifdef USE_PREFETCH
883
884 /*
885 * Read-ahead advice simulating asynchronous I/O with synchronous calls.
886 * Issue advice only if AIO is not used, direct I/O isn't enabled, the
887 * caller hasn't promised sequential access (overriding our detection
888 * heuristics), and max_ios hasn't been set to zero.
889 */
890 if (stream->sync_mode &&
892 (flags & READ_STREAM_SEQUENTIAL) == 0 &&
893 max_ios > 0)
894 stream->advice_enabled = true;
895#endif
896
897 /*
898 * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
899 * we still need to allocate space to combine and run one I/O. Bump it up
900 * to one, and remember to ask for synchronous I/O only.
901 */
902 if (max_ios == 0)
903 {
904 max_ios = 1;
906 }
907
908 /*
909 * Capture stable values for these two GUC-derived numbers for the
910 * lifetime of this stream, so we don't have to worry about the GUCs
911 * changing underneath us beyond this point.
912 */
913 stream->max_ios = max_ios;
915
916 stream->per_buffer_data_size = per_buffer_data_size;
917 stream->max_pinned_buffers = max_pinned_buffers;
918 stream->queue_size = queue_size;
919 stream->callback = callback;
920 stream->callback_private_data = callback_private_data;
924 stream->temporary = SmgrIsTemp(smgr);
925 stream->distance_decay_holdoff = 0;
926
927 /*
928 * Skip the initial ramp-up phase if the caller says we're going to be
929 * reading the whole relation. This way we start out assuming we'll be
930 * doing full io_combine_limit sized reads.
931 */
932 if (flags & READ_STREAM_FULL)
933 {
934 stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
935 stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
936 }
937 else
938 {
939 stream->readahead_distance = 1;
940 stream->combine_distance = 1;
941 }
944
945 /*
946 * Since we always access the same relation, we can initialize parts of
947 * the ReadBuffersOperation objects and leave them that way, to avoid
948 * wasting CPU cycles writing to them for each read.
949 */
950 for (int i = 0; i < max_ios; ++i)
951 {
952 stream->ios[i].op.rel = rel;
953 stream->ios[i].op.smgr = smgr;
954 stream->ios[i].op.persistence = persistence;
955 stream->ios[i].op.forknum = forknum;
956 stream->ios[i].op.strategy = strategy;
957 }
958
959 return stream;
960}
961
962/*
963 * Create a new read stream for reading a relation.
964 * See read_stream_begin_impl() for the detailed explanation.
965 */
968 BufferAccessStrategy strategy,
969 Relation rel,
970 ForkNumber forknum,
972 void *callback_private_data,
973 size_t per_buffer_data_size)
974{
975 return read_stream_begin_impl(flags,
976 strategy,
977 rel,
978 RelationGetSmgr(rel),
979 rel->rd_rel->relpersistence,
980 forknum,
981 callback,
982 callback_private_data,
983 per_buffer_data_size);
984}
985
986/*
987 * Create a new read stream for reading a SMgr relation.
988 * See read_stream_begin_impl() for the detailed explanation.
989 */
992 BufferAccessStrategy strategy,
993 SMgrRelation smgr,
994 char smgr_persistence,
995 ForkNumber forknum,
997 void *callback_private_data,
998 size_t per_buffer_data_size)
999{
1000 return read_stream_begin_impl(flags,
1001 strategy,
1002 NULL,
1003 smgr,
1005 forknum,
1006 callback,
1007 callback_private_data,
1008 per_buffer_data_size);
1009}
1010
1011/*
1012 * Pull one pinned buffer out of a stream. Each call returns successive
1013 * blocks in the order specified by the callback. If per_buffer_data_size was
1014 * set to a non-zero size, *per_buffer_data receives a pointer to the extra
1015 * per-buffer data that the callback had a chance to populate, which remains
1016 * valid until the next call to read_stream_next_buffer(). When the stream
1017 * runs out of data, InvalidBuffer is returned. The caller may decide to end
1018 * the stream early at any time by calling read_stream_end().
1019 */
1020Buffer
1021read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
1022{
1023 Buffer buffer;
1024 int16 oldest_buffer_index;
1025
1026#ifndef READ_STREAM_DISABLE_FAST_PATH
1027
1028 /*
1029 * A fast path for all-cached scans. This is the same as the usual
1030 * algorithm, but it is specialized for no I/O and no per-buffer data, so
1031 * we can skip the queue management code, stay in the same buffer slot and
1032 * use singular StartReadBuffer().
1033 */
1034 if (likely(stream->fast_path))
1035 {
1037
1038 /* Fast path assumptions. */
1039 Assert(stream->ios_in_progress == 0);
1040 Assert(stream->forwarded_buffers == 0);
1041 Assert(stream->pinned_buffers == 1);
1042 Assert(stream->readahead_distance == 1);
1043 Assert(stream->combine_distance == 1);
1044 Assert(stream->pending_read_nblocks == 0);
1045 Assert(stream->per_buffer_data_size == 0);
1047
1048 /* We're going to return the buffer we pinned last time. */
1049 oldest_buffer_index = stream->oldest_buffer_index;
1050 Assert((oldest_buffer_index + 1) % stream->queue_size ==
1051 stream->next_buffer_index);
1052 buffer = stream->buffers[oldest_buffer_index];
1053 Assert(buffer != InvalidBuffer);
1054
1055 /* Choose the next block to pin. */
1057
1059 {
1060 int flags = stream->read_buffers_flags;
1061
1062 if (stream->advice_enabled)
1064
1065 /*
1066 * While in fast-path, execute any IO that we might encounter
1067 * synchronously. Because we are, right now, only looking one
1068 * block ahead, dispatching any occasional IO to workers would
1069 * have the overhead of dispatching to workers, without any
1070 * realistic chance of the IO completing before we need it. We
1071 * will switch to non-synchronous IO after this.
1072 *
1073 * Arguably we should do so only for worker, as there's far less
1074 * dispatch overhead with io_uring. However, tests so far have not
1075 * shown a clear downside and additional io_method awareness here
1076 * seems not great from an abstraction POV.
1077 */
1079
1080 /*
1081 * Pin a buffer for the next call. Same buffer entry, and
1082 * arbitrary I/O entry (they're all free). We don't have to
1083 * adjust pinned_buffers because we're transferring one to caller
1084 * but pinning one more.
1085 *
1086 * In the fast path we don't need to check the pin limit. We're
1087 * always allowed at least one pin so that progress can be made,
1088 * and that's all we need here. Although two pins are momentarily
1089 * held at the same time, the model used here is that the stream
1090 * holds only one, and the other now belongs to the caller.
1091 */
1092 if (likely(!StartReadBuffer(&stream->ios[0].op,
1093 &stream->buffers[oldest_buffer_index],
1095 flags)))
1096 {
1097 /* Fast return. */
1099 return buffer;
1100 }
1101
1102 /* Next call must wait for I/O for the newly pinned buffer. */
1103 stream->oldest_io_index = 0;
1104 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
1105 stream->ios_in_progress = 1;
1106 stream->ios[0].buffer_index = oldest_buffer_index;
1107 stream->seq_blocknum = next_blocknum + 1;
1108
1109 /*
1110 * XXX: It might be worth triggering additional read-ahead here,
1111 * to avoid having to effectively do another synchronous IO for
1112 * the next block (if it were also a miss).
1113 */
1114
1115 /* update I/O stats */
1116 read_stream_count_io(stream, 1, stream->ios_in_progress);
1117
1118 /* update prefetch distance */
1120 }
1121 else
1122 {
1123 /* No more blocks, end of stream. */
1124 stream->readahead_distance = 0;
1125 stream->combine_distance = 0;
1126 stream->oldest_buffer_index = stream->next_buffer_index;
1127 stream->pinned_buffers = 0;
1128 stream->buffers[oldest_buffer_index] = InvalidBuffer;
1129 }
1130
1131 stream->fast_path = false;
1132 return buffer;
1133 }
1134#endif
1135
1136 if (unlikely(stream->pinned_buffers == 0))
1137 {
1138 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
1139
1140 /* End of stream reached? */
1141 if (stream->readahead_distance == 0)
1142 return InvalidBuffer;
1143
1144 /*
1145 * The usual order of operations is that we look ahead at the bottom
1146 * of this function after potentially finishing an I/O and making
1147 * space for more, but if we're just starting up we'll need to crank
1148 * the handle to get started.
1149 */
1150 read_stream_look_ahead(stream);
1151
1152 /* End of stream reached? */
1153 if (stream->pinned_buffers == 0)
1154 {
1155 Assert(stream->readahead_distance == 0);
1156 return InvalidBuffer;
1157 }
1158 }
1159
1160 /* Grab the oldest pinned buffer and associated per-buffer data. */
1161 Assert(stream->pinned_buffers > 0);
1162 oldest_buffer_index = stream->oldest_buffer_index;
1163 Assert(oldest_buffer_index >= 0 &&
1165 buffer = stream->buffers[oldest_buffer_index];
1166 if (per_buffer_data)
1167 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
1168
1169 Assert(BufferIsValid(buffer));
1170
1171 /* Do we have to wait for an associated I/O first? */
1172 if (stream->ios_in_progress > 0 &&
1173 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
1174 {
1175 int16 io_index = stream->oldest_io_index;
1176 bool needed_wait;
1177
1178 /* Sanity check that we still agree on the buffers. */
1179 Assert(stream->ios[io_index].op.buffers ==
1180 &stream->buffers[oldest_buffer_index]);
1181
1183
1184 Assert(stream->ios_in_progress > 0);
1185 stream->ios_in_progress--;
1186 if (++stream->oldest_io_index == stream->max_ios)
1187 stream->oldest_io_index = 0;
1188
1189 /*
1190 * If the IO was executed synchronously, we will never see
1191 * WaitReadBuffers() block. Treat it as if it did block. This is
1192 * particularly crucial when effective_io_concurrency=0 is used, as
1193 * all IO will be synchronous. Without treating synchronous IO as
1194 * having waited, we'd never allow the distance to get large enough to
1195 * allow for IO combining, resulting in bad performance.
1196 */
1198 needed_wait = true;
1199
1200 /* Count it as a wait if we need to wait for IO */
1201 if (needed_wait)
1202 read_stream_count_wait(stream);
1203
1204 /*
1205 * Have the read-ahead distance ramp up rapidly after we needed to
1206 * wait for IO. We only increase the read-ahead-distance when we
1207 * needed to wait, to avoid increasing the distance further than
1208 * necessary, as looking ahead too far can be costly, both due to the
1209 * cost of unnecessarily pinning many buffers and due to doing IOs
1210 * that may never be consumed if the stream is ended/reset before
1211 * completion.
1212 *
1213 * If we did not need to wait, the current distance was evidently
1214 * sufficient.
1215 *
1216 * NB: Must not increase the distance if we already reached the end of
1217 * the stream, as stream->readahead_distance == 0 is used to keep
1218 * track of having reached the end.
1219 */
1220 if (stream->readahead_distance > 0 && needed_wait)
1221 {
1222 /* wider temporary value, due to overflow risk */
1223 int32 readahead_distance;
1224
1225 readahead_distance = stream->readahead_distance * 2;
1226 readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
1227 stream->readahead_distance = readahead_distance;
1228 }
1229
1230 /*
1231 * As we needed IO, prevent distances from being reduced within our
1232 * maximum look-ahead window. This avoids collapsing distances too
1233 * quickly in workloads where most of the required blocks are cached,
1234 * but where the remaining IOs are a sufficient enough factor to cause
1235 * a substantial slowdown if executed synchronously.
1236 *
1237 * There are valid arguments for preventing decay for max_ios or for
1238 * max_pinned_buffers. But the argument for max_pinned_buffers seems
1239 * clearer - if we can't see any misses within the maximum look-ahead
1240 * distance, we can't do any useful read-ahead.
1241 */
1243
1244 /*
1245 * Whether we needed to wait or not, allow for more IO combining if we
1246 * needed to do IO. The reason to do so independent of needing to wait
1247 * is that when the data is resident in the kernel page cache, IO
1248 * combining reduces the syscall / dispatch overhead, making it
1249 * worthwhile regardless of needing to wait.
1250 *
1251 * It is also important with io_uring as it will never signal the need
1252 * to wait for reads if all the data is in the page cache. There are
1253 * heuristics to deal with that in method_io_uring.c, but they only
1254 * work when the IO gets large enough.
1255 */
1256 if (stream->combine_distance > 0 &&
1257 stream->combine_distance < stream->io_combine_limit)
1258 {
1259 /* wider temporary value, due to overflow risk */
1260 int32 combine_distance;
1261
1262 combine_distance = stream->combine_distance * 2;
1263 combine_distance = Min(combine_distance, stream->io_combine_limit);
1264 combine_distance = Min(combine_distance, stream->max_pinned_buffers);
1265 stream->combine_distance = combine_distance;
1266 }
1267
1268 /*
1269 * If we've reached the first block of a sequential region we're
1270 * issuing advice for, cancel that until the next jump. The kernel
1271 * will see the sequential preadv() pattern starting here.
1272 */
1273 if (stream->advice_enabled &&
1274 stream->ios[io_index].op.blocknum == stream->seq_until_processed)
1276 }
1277
1278 /*
1279 * We must zap this queue entry, or else it would appear as a forwarded
1280 * buffer. If it's potentially in the overflow zone (ie from a
1281 * multi-block I/O that wrapped around the queue), also zap the copy.
1282 */
1283 stream->buffers[oldest_buffer_index] = InvalidBuffer;
1285 stream->buffers[stream->queue_size + oldest_buffer_index] =
1287
1288#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
1289
1290 /*
1291 * The caller will get access to the per-buffer data, until the next call.
1292 * We wipe the one before, which is never occupied because queue_size
1293 * allowed one extra element. This will hopefully trip up client code
1294 * that is holding a dangling pointer to it.
1295 */
1296 if (stream->per_buffer_data)
1297 {
1298 void *per_buffer_data;
1299
1300 per_buffer_data = get_per_buffer_data(stream,
1301 oldest_buffer_index == 0 ?
1302 stream->queue_size - 1 :
1303 oldest_buffer_index - 1);
1304
1305#if defined(CLOBBER_FREED_MEMORY)
1306 /* This also tells Valgrind the memory is "noaccess". */
1307 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
1308#elif defined(USE_VALGRIND)
1309 /* Tell it ourselves. */
1310 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
1311 stream->per_buffer_data_size);
1312#endif
1313 }
1314#endif
1315
1317
1318 /* Pin transferred to caller. */
1319 Assert(stream->pinned_buffers > 0);
1320 stream->pinned_buffers--;
1321
1322 /* Advance oldest buffer, with wrap-around. */
1323 stream->oldest_buffer_index++;
1324 if (stream->oldest_buffer_index == stream->queue_size)
1325 stream->oldest_buffer_index = 0;
1326
1327 /* Prepare for the next call. */
1328 read_stream_look_ahead(stream);
1329
1330#ifndef READ_STREAM_DISABLE_FAST_PATH
1331 /* See if we can take the fast path for all-cached scans next time. */
1332 if (stream->ios_in_progress == 0 &&
1333 stream->forwarded_buffers == 0 &&
1334 stream->pinned_buffers == 1 &&
1335 stream->readahead_distance == 1 &&
1336 stream->combine_distance == 1 &&
1337 stream->pending_read_nblocks == 0 &&
1338 stream->per_buffer_data_size == 0)
1339 {
1340 /*
1341 * The fast path spins on one buffer entry repeatedly instead of
1342 * rotating through the whole queue and clearing the entries behind
1343 * it. If the buffer it starts with happened to be forwarded between
1344 * StartReadBuffers() calls and also wrapped around the circular queue
1345 * partway through, then a copy also exists in the overflow zone, and
1346 * it won't clear it out as the regular path would. Do that now, so
1347 * it doesn't need code for that.
1348 */
1349 if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1350 stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1352
1353 stream->fast_path = true;
1354 }
1355#endif
1356
1357 return buffer;
1358}
1359
1360/*
1361 * Transitional support for code that would like to perform or skip reads
1362 * itself, without using the stream. Returns, and consumes, the next block
1363 * number that would be read by the stream's look-ahead algorithm, or
1364 * InvalidBlockNumber if the end of the stream is reached. Also reports the
1365 * strategy that would be used to read it.
1366 */
1369{
1370 *strategy = stream->ios[0].op.strategy;
1371 return read_stream_get_block(stream, NULL);
1372}
1373
1374/*
1375 * Temporarily stop consuming block numbers from the block number callback.
1376 * If called inside the block number callback, its return value should be
1377 * returned by the callback.
1378 */
1381{
1383 stream->resume_combine_distance = stream->combine_distance;
1384 stream->readahead_distance = 0;
1385 stream->combine_distance = 0;
1386 return InvalidBlockNumber;
1387}
1388
1389/*
1390 * Resume looking ahead after the block number callback reported
1391 * end-of-stream. This is useful for streams of self-referential blocks, after
1392 * a buffer needed to be consumed and examined to find more block numbers.
1393 */
1394void
1400
1401/*
1402 * Reset a read stream by releasing any queued up buffers, allowing the stream
1403 * to be used again for different blocks. This can be used to clear an
1404 * end-of-stream condition and start again, or to throw away blocks that were
1405 * speculatively read and read some different blocks instead.
1406 */
1407void
1409{
1410 int16 index;
1411 Buffer buffer;
1412
1413 /* Stop looking ahead. */
1414 stream->readahead_distance = 0;
1415 stream->combine_distance = 0;
1416
1417 /* Forget buffered block number and fast path state. */
1419 stream->fast_path = false;
1420
1421 /* Unpin anything that wasn't consumed. */
1422 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1423 ReleaseBuffer(buffer);
1424
1425 /* Unpin any unused forwarded buffers. */
1426 index = stream->next_buffer_index;
1427 while (index < stream->initialized_buffers &&
1428 (buffer = stream->buffers[index]) != InvalidBuffer)
1429 {
1430 Assert(stream->forwarded_buffers > 0);
1431 stream->forwarded_buffers--;
1432 ReleaseBuffer(buffer);
1433
1434 stream->buffers[index] = InvalidBuffer;
1436 stream->buffers[stream->queue_size + index] = InvalidBuffer;
1437
1438 if (++index == stream->queue_size)
1439 index = 0;
1440 }
1441
1442 Assert(stream->forwarded_buffers == 0);
1443 Assert(stream->pinned_buffers == 0);
1444 Assert(stream->ios_in_progress == 0);
1445
1446 /* Start off assuming data is cached. */
1447 stream->readahead_distance = 1;
1448 stream->combine_distance = 1;
1450 stream->resume_combine_distance = stream->combine_distance;
1451 stream->distance_decay_holdoff = 0;
1452}
1453
1454/*
1455 * Release and free a read stream.
1456 */
1457void
1459{
1460 read_stream_reset(stream);
1461 pfree(stream);
1462}
int io_method
Definition aio.c:74
void pgaio_enter_batchmode(void)
Definition aio.c:1091
void pgaio_exit_batchmode(void)
Definition aio.c:1102
@ IOMETHOD_SYNC
Definition aio.h:34
uint32 BlockNumber
Definition block.h:31
#define InvalidBlockNumber
Definition block.h:33
int Buffer
Definition buf.h:23
#define InvalidBuffer
Definition buf.h:25
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition bufmgr.c:4446
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition bufmgr.c:1609
void ReleaseBuffer(Buffer buffer)
Definition bufmgr.c:5586
bool WaitReadBuffers(ReadBuffersOperation *operation)
Definition bufmgr.c:1750
int effective_io_concurrency
Definition bufmgr.c:200
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition bufmgr.c:1628
int io_combine_limit
Definition bufmgr.c:215
uint32 GetAdditionalPinLimit(void)
Definition bufmgr.c:2698
uint32 GetPinLimit(void)
Definition bufmgr.c:2686
#define READ_BUFFERS_ISSUE_ADVICE
Definition bufmgr.h:124
#define READ_BUFFERS_SYNCHRONOUSLY
Definition bufmgr.h:128
static bool BufferIsValid(Buffer bufnum)
Definition bufmgr.h:419
#define Min(x, y)
Definition c.h:1091
#define likely(x)
Definition c.h:437
#define MAXALIGN(LEN)
Definition c.h:896
#define Max(x, y)
Definition c.h:1085
#define Assert(condition)
Definition c.h:943
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int16_t int16
Definition c.h:619
int32_t int32
Definition c.h:620
uint16_t uint16
Definition c.h:623
#define unlikely(x)
Definition c.h:438
uint32_t uint32
Definition c.h:624
#define PG_INT16_MAX
Definition c.h:670
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
bool IsCatalogRelationOid(Oid relid)
Definition catalog.c:121
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
int io_direct_flags
Definition fd.c:172
#define IO_DIRECT_DATA
Definition fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition freelist.c:574
Oid MyDatabaseId
Definition globals.c:96
int i
Definition isn.c:77
uint32 GetAdditionalLocalPinLimit(void)
Definition localbuf.c:316
uint32 GetLocalPinLimit(void)
Definition localbuf.c:308
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition memdebug.h:27
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28
unsigned int Oid
static int fb(int x)
BlockNumber read_stream_pause(ReadStream *stream)
ReadStream * read_stream_begin_smgr_relation(int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
BlockNumber read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
void read_stream_reset(ReadStream *stream)
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
static bool read_stream_start_pending_read(ReadStream *stream)
ReadStream * read_stream_begin_relation(int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
static bool read_stream_should_issue_now(ReadStream *stream)
static bool read_stream_should_look_ahead(ReadStream *stream)
static void read_stream_look_ahead(ReadStream *stream)
void read_stream_enable_stats(ReadStream *stream, IOStats *stats)
static void read_stream_count_wait(ReadStream *stream)
void read_stream_end(ReadStream *stream)
BlockNumber block_range_read_stream_cb(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
static void read_stream_count_io(ReadStream *stream, int nblocks, int in_progress)
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
static void read_stream_count_prefetch(ReadStream *stream)
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
void read_stream_resume(ReadStream *stream)
#define READ_STREAM_MAINTENANCE
Definition read_stream.h:28
#define READ_STREAM_USE_BATCHING
Definition read_stream.h:64
BlockNumber(* ReadStreamBlockNumberCB)(ReadStream *stream, void *callback_private_data, void *per_buffer_data)
Definition read_stream.h:78
#define READ_STREAM_FULL
Definition read_stream.h:43
#define READ_STREAM_SEQUENTIAL
Definition read_stream.h:36
static SMgrRelation RelationGetSmgr(Relation rel)
Definition rel.h:578
ForkNumber
Definition relpath.h:56
#define SmgrIsTemp(smgr)
Definition smgr.h:74
int get_tablespace_io_concurrency(Oid spcid)
Definition spccache.c:216
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition spccache.c:230
int16 distance_capacity
uint64 io_count
uint64 wait_count
int16 distance_max
uint64 io_in_progress
uint64 distance_sum
uint64 prefetch_count
uint64 io_nblocks
int16 buffer_index
Definition read_stream.c:88
ReadBuffersOperation op
Definition read_stream.c:89
ForkNumber forknum
Definition bufmgr.h:137
SMgrRelation smgr
Definition bufmgr.h:135
BufferAccessStrategy strategy
Definition bufmgr.h:138
BlockNumber blocknum
Definition bufmgr.h:146
int16 io_combine_limit
Definition read_stream.c:98
uint16 distance_decay_holdoff
int16 ios_in_progress
Definition read_stream.c:99
void * per_buffer_data
BlockNumber seq_until_processed
int16 pinned_buffers
int16 max_ios
Definition read_stream.c:97
int16 oldest_buffer_index
BlockNumber seq_blocknum
bool batch_mode
bool advice_enabled
BlockNumber pending_read_blocknum
int16 max_pinned_buffers
InProgressIO * ios
int16 oldest_io_index
int16 combine_distance
int16 readahead_distance
int read_buffers_flags
int16 resume_readahead_distance
BlockNumber buffered_blocknum
int16 queue_size
int16 next_buffer_index
int16 initialized_buffers
size_t per_buffer_data_size
int16 resume_combine_distance
int16 forwarded_buffers
ReadStreamBlockNumberCB callback
int16 next_io_index
int16 pending_read_nblocks
void * callback_private_data
IOStats * stats
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
RelFileLocator locator
RelFileNumber relNumber
Form_pg_class rd_rel
Definition rel.h:111
RelFileLocatorBackend smgr_rlocator
Definition smgr.h:38
Definition type.h:96
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)