PostgreSQL Source Code git master
Loading...
Searching...
No Matches
read_stream.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "storage/read_stream.h"
#include "utils/memdebug.h"
#include "utils/rel.h"
#include "utils/spccache.h"
Include dependency graph for read_stream.c:

Go to the source code of this file.

Data Structures

struct  InProgressIO
 
struct  ReadStream
 

Typedefs

typedef struct InProgressIO InProgressIO
 

Functions

static voidget_per_buffer_data (ReadStream *stream, int16 buffer_index)
 
BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
static BlockNumber read_stream_get_block (ReadStream *stream, void *per_buffer_data)
 
static void read_stream_unget_block (ReadStream *stream, BlockNumber blocknum)
 
static bool read_stream_start_pending_read (ReadStream *stream)
 
static void read_stream_look_ahead (ReadStream *stream)
 
static ReadStreamread_stream_begin_impl (int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
BlockNumber read_stream_pause (ReadStream *stream)
 
void read_stream_resume (ReadStream *stream)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Typedef Documentation

◆ InProgressIO

Function Documentation

◆ block_range_read_stream_cb()

◆ get_per_buffer_data()

static void * get_per_buffer_data ( ReadStream stream,
int16  buffer_index 
)
inlinestatic

Definition at line 152 of file read_stream.c.

153{
154 return (char *) stream->per_buffer_data +
155 stream->per_buffer_data_size * buffer_index;
156}
void * per_buffer_data
size_t per_buffer_data_size

References ReadStream::per_buffer_data, and ReadStream::per_buffer_data_size.

Referenced by read_stream_look_ahead(), and read_stream_next_buffer().

◆ read_stream_begin_impl()

static ReadStream * read_stream_begin_impl ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
SMgrRelation  smgr,
char  persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)
static

Definition at line 539 of file read_stream.c.

548{
549 ReadStream *stream;
550 size_t size;
551 int16 queue_size;
553 int max_ios;
555 uint32 max_pinned_buffers;
557 Oid tablespace_id;
558
559 /*
560 * Decide how many I/Os we will allow to run at the same time. That
561 * currently means advice to the kernel to tell it that we will soon read.
562 * This number also affects how far we look ahead for opportunities to
563 * start more I/Os.
564 */
565 tablespace_id = smgr->smgr_rlocator.locator.spcOid;
566 if (!OidIsValid(MyDatabaseId) ||
567 (rel && IsCatalogRelation(rel)) ||
569 {
570 /*
571 * Avoid circularity while trying to look up tablespace settings or
572 * before spccache.c is ready.
573 */
574 max_ios = effective_io_concurrency;
575 }
576 else if (flags & READ_STREAM_MAINTENANCE)
577 max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
578 else
579 max_ios = get_tablespace_io_concurrency(tablespace_id);
580
581 /* Cap to INT16_MAX to avoid overflowing below */
582 max_ios = Min(max_ios, PG_INT16_MAX);
583
584 /*
585 * If starting a multi-block I/O near the end of the queue, we might
586 * temporarily need extra space for overflowing buffers before they are
587 * moved to regular circular position. This is the maximum extra space we
588 * could need.
589 */
591
592 /*
593 * Choose the maximum number of buffers we're prepared to pin. We try to
594 * pin fewer if we can, though. We add one so that we can make progress
595 * even if max_ios is set to 0 (see also further down). For max_ios > 0,
596 * this also allows an extra full I/O's worth of buffers: after an I/O
597 * finishes we don't want to have to wait for its buffers to be consumed
598 * before starting a new one.
599 *
600 * Be careful not to allow int16 to overflow. That is possible with the
601 * current GUC range limits, so this is an artificial limit of ~32k
602 * buffers and we'd need to adjust the types to exceed that. We also have
603 * to allow for the spare entry and the overflow space.
604 */
605 max_pinned_buffers = (max_ios + 1) * io_combine_limit;
606 max_pinned_buffers = Min(max_pinned_buffers,
608
609 /* Give the strategy a chance to limit the number of buffers we pin. */
611 max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
612
613 /*
614 * Also limit our queue to the maximum number of pins we could ever be
615 * allowed to acquire according to the buffer manager. We may not really
616 * be able to use them all due to other pins held by this backend, but
617 * we'll check that later in read_stream_start_pending_read().
618 */
619 if (SmgrIsTemp(smgr))
621 else
623 max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
624
625 /*
626 * The limit might be zero on a system configured with too few buffers for
627 * the number of connections. We need at least one to make progress.
628 */
629 max_pinned_buffers = Max(1, max_pinned_buffers);
630
631 /*
632 * We need one extra entry for buffers and per-buffer data, because users
633 * of per-buffer data have access to the object until the next call to
634 * read_stream_next_buffer(), so we need a gap between the head and tail
635 * of the queue so that we don't clobber it.
636 */
637 queue_size = max_pinned_buffers + 1;
638
639 /*
640 * Allocate the object, the buffers, the ios and per_buffer_data space in
641 * one big chunk. Though we have queue_size buffers, we want to be able
642 * to assume that all the buffers for a single read are contiguous (i.e.
643 * don't wrap around halfway through), so we allow temporary overflows of
644 * up to the maximum possible overflow size.
645 */
646 size = offsetof(ReadStream, buffers);
647 size += sizeof(Buffer) * (queue_size + queue_overflow);
648 size += sizeof(InProgressIO) * Max(1, max_ios);
649 size += per_buffer_data_size * queue_size;
650 size += MAXIMUM_ALIGNOF * 2;
651 stream = (ReadStream *) palloc(size);
652 memset(stream, 0, offsetof(ReadStream, buffers));
653 stream->ios = (InProgressIO *)
654 MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
655 if (per_buffer_data_size > 0)
656 stream->per_buffer_data = (void *)
657 MAXALIGN(&stream->ios[Max(1, max_ios)]);
658
659 stream->sync_mode = io_method == IOMETHOD_SYNC;
660 stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
661
662#ifdef USE_PREFETCH
663
664 /*
665 * Read-ahead advice simulating asynchronous I/O with synchronous calls.
666 * Issue advice only if AIO is not used, direct I/O isn't enabled, the
667 * caller hasn't promised sequential access (overriding our detection
668 * heuristics), and max_ios hasn't been set to zero.
669 */
670 if (stream->sync_mode &&
672 (flags & READ_STREAM_SEQUENTIAL) == 0 &&
673 max_ios > 0)
674 stream->advice_enabled = true;
675#endif
676
677 /*
678 * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
679 * we still need to allocate space to combine and run one I/O. Bump it up
680 * to one, and remember to ask for synchronous I/O only.
681 */
682 if (max_ios == 0)
683 {
684 max_ios = 1;
686 }
687
688 /*
689 * Capture stable values for these two GUC-derived numbers for the
690 * lifetime of this stream, so we don't have to worry about the GUCs
691 * changing underneath us beyond this point.
692 */
693 stream->max_ios = max_ios;
695
696 stream->per_buffer_data_size = per_buffer_data_size;
697 stream->max_pinned_buffers = max_pinned_buffers;
698 stream->queue_size = queue_size;
699 stream->callback = callback;
700 stream->callback_private_data = callback_private_data;
704 stream->temporary = SmgrIsTemp(smgr);
705
706 /*
707 * Skip the initial ramp-up phase if the caller says we're going to be
708 * reading the whole relation. This way we start out assuming we'll be
709 * doing full io_combine_limit sized reads.
710 */
711 if (flags & READ_STREAM_FULL)
712 stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
713 else
714 stream->distance = 1;
715 stream->resume_distance = stream->distance;
716
717 /*
718 * Since we always access the same relation, we can initialize parts of
719 * the ReadBuffersOperation objects and leave them that way, to avoid
720 * wasting CPU cycles writing to them for each read.
721 */
722 for (int i = 0; i < max_ios; ++i)
723 {
724 stream->ios[i].op.rel = rel;
725 stream->ios[i].op.smgr = smgr;
726 stream->ios[i].op.persistence = persistence;
727 stream->ios[i].op.forknum = forknum;
728 stream->ios[i].op.strategy = strategy;
729 }
730
731 return stream;
732}
int io_method
Definition aio.c:74
@ IOMETHOD_SYNC
Definition aio.h:34
int Buffer
Definition buf.h:23
int effective_io_concurrency
Definition bufmgr.c:200
int io_combine_limit
Definition bufmgr.c:215
uint32 GetPinLimit(void)
Definition bufmgr.c:2609
#define READ_BUFFERS_SYNCHRONOUSLY
Definition bufmgr.h:128
#define Min(x, y)
Definition c.h:1093
#define MAXALIGN(LEN)
Definition c.h:898
#define Max(x, y)
Definition c.h:1087
int16_t int16
Definition c.h:613
uint32_t uint32
Definition c.h:618
#define PG_INT16_MAX
Definition c.h:672
#define OidIsValid(objectId)
Definition c.h:860
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
bool IsCatalogRelationOid(Oid relid)
Definition catalog.c:121
int io_direct_flags
Definition fd.c:172
#define IO_DIRECT_DATA
Definition fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition freelist.c:609
Oid MyDatabaseId
Definition globals.c:94
int i
Definition isn.c:77
uint32 GetLocalPinLimit(void)
Definition localbuf.c:307
void * palloc(Size size)
Definition mcxt.c:1387
unsigned int Oid
static int fb(int x)
#define READ_STREAM_MAINTENANCE
Definition read_stream.h:28
#define READ_STREAM_USE_BATCHING
Definition read_stream.h:64
#define READ_STREAM_FULL
Definition read_stream.h:43
#define READ_STREAM_SEQUENTIAL
Definition read_stream.h:36
#define SmgrIsTemp(smgr)
Definition smgr.h:74
int get_tablespace_io_concurrency(Oid spcid)
Definition spccache.c:216
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition spccache.c:230
ReadBuffersOperation op
Definition read_stream.c:86
ForkNumber forknum
Definition bufmgr.h:137
SMgrRelation smgr
Definition bufmgr.h:135
BufferAccessStrategy strategy
Definition bufmgr.h:138
int16 io_combine_limit
Definition read_stream.c:95
int16 distance
BlockNumber seq_until_processed
int16 max_ios
Definition read_stream.c:94
BlockNumber seq_blocknum
bool batch_mode
bool advice_enabled
int16 max_pinned_buffers
Definition read_stream.c:98
InProgressIO * ios
int read_buffers_flags
BlockNumber buffered_blocknum
int16 queue_size
Definition read_stream.c:97
int16 resume_distance
ReadStreamBlockNumberCB callback
void * callback_private_data
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
RelFileLocator locator
RelFileNumber relNumber
RelFileLocatorBackend smgr_rlocator
Definition smgr.h:38
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)

References ReadStream::advice_enabled, ReadStream::batch_mode, ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::callback, callback(), ReadStream::callback_private_data, ReadStream::distance, effective_io_concurrency, fb(), ReadBuffersOperation::forknum, get_tablespace_io_concurrency(), get_tablespace_maintenance_io_concurrency(), GetAccessStrategyPinLimit(), GetLocalPinLimit(), GetPinLimit(), i, InvalidBlockNumber, ReadStream::io_combine_limit, io_combine_limit, IO_DIRECT_DATA, io_direct_flags, io_method, IOMETHOD_SYNC, ReadStream::ios, IsCatalogRelation(), IsCatalogRelationOid(), RelFileLocatorBackend::locator, Max, ReadStream::max_ios, ReadStream::max_pinned_buffers, MAXALIGN, Min, MyDatabaseId, OidIsValid, InProgressIO::op, palloc(), ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadBuffersOperation::persistence, PG_INT16_MAX, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_SYNCHRONOUSLY, READ_STREAM_FULL, READ_STREAM_MAINTENANCE, READ_STREAM_SEQUENTIAL, READ_STREAM_USE_BATCHING, ReadBuffersOperation::rel, RelFileLocator::relNumber, ReadStream::resume_distance, ReadStream::seq_blocknum, ReadStream::seq_until_processed, ReadBuffersOperation::smgr, SMgrRelationData::smgr_rlocator, SmgrIsTemp, RelFileLocator::spcOid, ReadBuffersOperation::strategy, ReadStream::sync_mode, and ReadStream::temporary.

Referenced by read_stream_begin_relation(), and read_stream_begin_smgr_relation().

◆ read_stream_begin_relation()

ReadStream * read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 739 of file read_stream.c.

746{
747 return read_stream_begin_impl(flags,
748 strategy,
749 rel,
750 RelationGetSmgr(rel),
751 rel->rd_rel->relpersistence,
752 forknum,
753 callback,
754 callback_private_data,
755 per_buffer_data_size);
756}
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
static SMgrRelation RelationGetSmgr(Relation rel)
Definition rel.h:576
Form_pg_class rd_rel
Definition rel.h:111

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), autoprewarm_database_main(), blbulkdelete(), blgetbitmap(), blvacuumcleanup(), brin_vacuum_scan(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), ginvacuumcleanup(), gistvacuumscan(), hashbulkdelete(), heap_beginscan(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), pgstathashindex(), pgstatindex_impl(), spgvacuumscan(), statapprox_heap(), and verify_heapam().

◆ read_stream_begin_smgr_relation()

ReadStream * read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 763 of file read_stream.c.

771{
772 return read_stream_begin_impl(flags,
773 strategy,
774 NULL,
775 smgr,
777 forknum,
778 callback,
779 callback_private_data,
780 per_buffer_data_size);
781}

References callback(), fb(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

◆ read_stream_get_block()

static BlockNumber read_stream_get_block ( ReadStream stream,
void per_buffer_data 
)
inlinestatic

Definition at line 180 of file read_stream.c.

181{
182 BlockNumber blocknum;
183
184 blocknum = stream->buffered_blocknum;
185 if (blocknum != InvalidBlockNumber)
187 else
188 {
189 /*
190 * Tell Valgrind that the per-buffer data is undefined. That replaces
191 * the "noaccess" state that was set when the consumer moved past this
192 * entry last time around the queue, and should also catch callbacks
193 * that fail to initialize data that the buffer consumer later
194 * accesses. On the first go around, it is undefined already.
195 */
196 VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
197 stream->per_buffer_data_size);
198 blocknum = stream->callback(stream,
199 stream->callback_private_data,
200 per_buffer_data);
201 }
202
203 return blocknum;
204}
uint32 BlockNumber
Definition block.h:31
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28

References ReadStream::buffered_blocknum, ReadStream::callback, ReadStream::callback_private_data, InvalidBlockNumber, ReadStream::per_buffer_data_size, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by read_stream_look_ahead(), read_stream_next_block(), and read_stream_next_buffer().

◆ read_stream_look_ahead()

static void read_stream_look_ahead ( ReadStream stream)
static

Definition at line 430 of file read_stream.c.

431{
432 /*
433 * Allow amortizing the cost of submitting IO over multiple IOs. This
434 * requires that we don't do any operations that could lead to a deadlock
435 * with staged-but-unsubmitted IO. The callback needs to opt-in to being
436 * careful.
437 */
438 if (stream->batch_mode)
440
441 while (stream->ios_in_progress < stream->max_ios &&
442 stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
443 {
444 BlockNumber blocknum;
445 int16 buffer_index;
446 void *per_buffer_data;
447
448 if (stream->pending_read_nblocks == stream->io_combine_limit)
449 {
451 continue;
452 }
453
454 /*
455 * See which block the callback wants next in the stream. We need to
456 * compute the index of the Nth block of the pending read including
457 * wrap-around, but we don't want to use the expensive % operator.
458 */
459 buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
460 if (buffer_index >= stream->queue_size)
461 buffer_index -= stream->queue_size;
462 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
463 per_buffer_data = get_per_buffer_data(stream, buffer_index);
464 blocknum = read_stream_get_block(stream, per_buffer_data);
465 if (blocknum == InvalidBlockNumber)
466 {
467 /* End of stream. */
468 stream->distance = 0;
469 break;
470 }
471
472 /* Can we merge it with the pending read? */
473 if (stream->pending_read_nblocks > 0 &&
474 stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
475 {
476 stream->pending_read_nblocks++;
477 continue;
478 }
479
480 /* We have to start the pending read before we can build another. */
481 while (stream->pending_read_nblocks > 0)
482 {
483 if (!read_stream_start_pending_read(stream) ||
484 stream->ios_in_progress == stream->max_ios)
485 {
486 /* We've hit the buffer or I/O limit. Rewind and stop here. */
487 read_stream_unget_block(stream, blocknum);
488 if (stream->batch_mode)
490 return;
491 }
492 }
493
494 /* This is the start of a new pending read. */
495 stream->pending_read_blocknum = blocknum;
496 stream->pending_read_nblocks = 1;
497 }
498
499 /*
500 * We don't start the pending read just because we've hit the distance
501 * limit, preferring to give it another chance to grow to full
502 * io_combine_limit size once more buffers have been consumed. However,
503 * if we've already reached io_combine_limit, or we've reached the
504 * distance limit and there isn't anything pinned yet, or the callback has
505 * signaled end-of-stream, we start the read immediately. Note that the
506 * pending read can exceed the distance goal, if the latter was reduced
507 * after hitting the per-backend buffer limit.
508 */
509 if (stream->pending_read_nblocks > 0 &&
510 (stream->pending_read_nblocks == stream->io_combine_limit ||
511 (stream->pending_read_nblocks >= stream->distance &&
512 stream->pinned_buffers == 0) ||
513 stream->distance == 0) &&
514 stream->ios_in_progress < stream->max_ios)
516
517 /*
518 * There should always be something pinned when we leave this function,
519 * whether started by this call or not, unless we've hit the end of the
520 * stream. In the worst case we can always make progress one buffer at a
521 * time.
522 */
523 Assert(stream->pinned_buffers > 0 || stream->distance == 0);
524
525 if (stream->batch_mode)
527}
void pgaio_enter_batchmode(void)
Definition aio.c:1081
void pgaio_exit_batchmode(void)
Definition aio.c:1092
#define Assert(condition)
Definition c.h:945
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
static bool read_stream_start_pending_read(ReadStream *stream)
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
int16 ios_in_progress
Definition read_stream.c:96
int16 pinned_buffers
BlockNumber pending_read_blocknum
int16 next_buffer_index
int16 pending_read_nblocks

References Assert, ReadStream::batch_mode, ReadStream::distance, fb(), get_per_buffer_data(), InvalidBlockNumber, ReadStream::io_combine_limit, ReadStream::ios_in_progress, ReadStream::max_ios, ReadStream::next_buffer_index, ReadStream::pending_read_blocknum, ReadStream::pending_read_nblocks, pgaio_enter_batchmode(), pgaio_exit_batchmode(), ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_get_block(), read_stream_start_pending_read(), and read_stream_unget_block().

Referenced by read_stream_next_buffer().

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)

Definition at line 1033 of file read_stream.c.

1034{
1035 *strategy = stream->ios[0].op.strategy;
1036 return read_stream_get_block(stream, NULL);
1037}

References fb(), ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)

Definition at line 793 of file read_stream.c.

794{
795 Buffer buffer;
796 int16 oldest_buffer_index;
797
798#ifndef READ_STREAM_DISABLE_FAST_PATH
799
800 /*
801 * A fast path for all-cached scans. This is the same as the usual
802 * algorithm, but it is specialized for no I/O and no per-buffer data, so
803 * we can skip the queue management code, stay in the same buffer slot and
804 * use singular StartReadBuffer().
805 */
806 if (likely(stream->fast_path))
807 {
809
810 /* Fast path assumptions. */
811 Assert(stream->ios_in_progress == 0);
812 Assert(stream->forwarded_buffers == 0);
813 Assert(stream->pinned_buffers == 1);
814 Assert(stream->distance == 1);
815 Assert(stream->pending_read_nblocks == 0);
816 Assert(stream->per_buffer_data_size == 0);
818
819 /* We're going to return the buffer we pinned last time. */
820 oldest_buffer_index = stream->oldest_buffer_index;
821 Assert((oldest_buffer_index + 1) % stream->queue_size ==
822 stream->next_buffer_index);
823 buffer = stream->buffers[oldest_buffer_index];
824 Assert(buffer != InvalidBuffer);
825
826 /* Choose the next block to pin. */
828
830 {
831 int flags = stream->read_buffers_flags;
832
833 if (stream->advice_enabled)
835
836 /*
837 * Pin a buffer for the next call. Same buffer entry, and
838 * arbitrary I/O entry (they're all free). We don't have to
839 * adjust pinned_buffers because we're transferring one to caller
840 * but pinning one more.
841 *
842 * In the fast path we don't need to check the pin limit. We're
843 * always allowed at least one pin so that progress can be made,
844 * and that's all we need here. Although two pins are momentarily
845 * held at the same time, the model used here is that the stream
846 * holds only one, and the other now belongs to the caller.
847 */
848 if (likely(!StartReadBuffer(&stream->ios[0].op,
849 &stream->buffers[oldest_buffer_index],
851 flags)))
852 {
853 /* Fast return. */
854 return buffer;
855 }
856
857 /* Next call must wait for I/O for the newly pinned buffer. */
858 stream->oldest_io_index = 0;
859 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
860 stream->ios_in_progress = 1;
861 stream->ios[0].buffer_index = oldest_buffer_index;
862 stream->seq_blocknum = next_blocknum + 1;
863 }
864 else
865 {
866 /* No more blocks, end of stream. */
867 stream->distance = 0;
868 stream->oldest_buffer_index = stream->next_buffer_index;
869 stream->pinned_buffers = 0;
870 stream->buffers[oldest_buffer_index] = InvalidBuffer;
871 }
872
873 stream->fast_path = false;
874 return buffer;
875 }
876#endif
877
878 if (unlikely(stream->pinned_buffers == 0))
879 {
880 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
881
882 /* End of stream reached? */
883 if (stream->distance == 0)
884 return InvalidBuffer;
885
886 /*
887 * The usual order of operations is that we look ahead at the bottom
888 * of this function after potentially finishing an I/O and making
889 * space for more, but if we're just starting up we'll need to crank
890 * the handle to get started.
891 */
893
894 /* End of stream reached? */
895 if (stream->pinned_buffers == 0)
896 {
897 Assert(stream->distance == 0);
898 return InvalidBuffer;
899 }
900 }
901
902 /* Grab the oldest pinned buffer and associated per-buffer data. */
903 Assert(stream->pinned_buffers > 0);
904 oldest_buffer_index = stream->oldest_buffer_index;
905 Assert(oldest_buffer_index >= 0 &&
907 buffer = stream->buffers[oldest_buffer_index];
908 if (per_buffer_data)
909 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
910
911 Assert(BufferIsValid(buffer));
912
913 /* Do we have to wait for an associated I/O first? */
914 if (stream->ios_in_progress > 0 &&
915 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
916 {
918 int32 distance; /* wider temporary value, clamped below */
919
920 /* Sanity check that we still agree on the buffers. */
921 Assert(stream->ios[io_index].op.buffers ==
922 &stream->buffers[oldest_buffer_index]);
923
924 WaitReadBuffers(&stream->ios[io_index].op);
925
926 Assert(stream->ios_in_progress > 0);
927 stream->ios_in_progress--;
928 if (++stream->oldest_io_index == stream->max_ios)
929 stream->oldest_io_index = 0;
930
931 /* Look-ahead distance ramps up rapidly after we do I/O. */
932 distance = stream->distance * 2;
933 distance = Min(distance, stream->max_pinned_buffers);
934 stream->distance = distance;
935
936 /*
937 * If we've reached the first block of a sequential region we're
938 * issuing advice for, cancel that until the next jump. The kernel
939 * will see the sequential preadv() pattern starting here.
940 */
941 if (stream->advice_enabled &&
942 stream->ios[io_index].op.blocknum == stream->seq_until_processed)
944 }
945
946 /*
947 * We must zap this queue entry, or else it would appear as a forwarded
948 * buffer. If it's potentially in the overflow zone (ie from a
949 * multi-block I/O that wrapped around the queue), also zap the copy.
950 */
951 stream->buffers[oldest_buffer_index] = InvalidBuffer;
953 stream->buffers[stream->queue_size + oldest_buffer_index] =
955
956#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
957
958 /*
959 * The caller will get access to the per-buffer data, until the next call.
960 * We wipe the one before, which is never occupied because queue_size
961 * allowed one extra element. This will hopefully trip up client code
962 * that is holding a dangling pointer to it.
963 */
964 if (stream->per_buffer_data)
965 {
966 void *per_buffer_data;
967
968 per_buffer_data = get_per_buffer_data(stream,
969 oldest_buffer_index == 0 ?
970 stream->queue_size - 1 :
971 oldest_buffer_index - 1);
972
973#if defined(CLOBBER_FREED_MEMORY)
974 /* This also tells Valgrind the memory is "noaccess". */
975 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
976#elif defined(USE_VALGRIND)
977 /* Tell it ourselves. */
978 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
979 stream->per_buffer_data_size);
980#endif
981 }
982#endif
983
984 /* Pin transferred to caller. */
985 Assert(stream->pinned_buffers > 0);
986 stream->pinned_buffers--;
987
988 /* Advance oldest buffer, with wrap-around. */
989 stream->oldest_buffer_index++;
990 if (stream->oldest_buffer_index == stream->queue_size)
991 stream->oldest_buffer_index = 0;
992
993 /* Prepare for the next call. */
995
996#ifndef READ_STREAM_DISABLE_FAST_PATH
997 /* See if we can take the fast path for all-cached scans next time. */
998 if (stream->ios_in_progress == 0 &&
999 stream->forwarded_buffers == 0 &&
1000 stream->pinned_buffers == 1 &&
1001 stream->distance == 1 &&
1002 stream->pending_read_nblocks == 0 &&
1003 stream->per_buffer_data_size == 0)
1004 {
1005 /*
1006 * The fast path spins on one buffer entry repeatedly instead of
1007 * rotating through the whole queue and clearing the entries behind
1008 * it. If the buffer it starts with happened to be forwarded between
1009 * StartReadBuffers() calls and also wrapped around the circular queue
1010 * partway through, then a copy also exists in the overflow zone, and
1011 * it won't clear it out as the regular path would. Do that now, so
1012 * it doesn't need code for that.
1013 */
1014 if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
1015 stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
1017
1018 stream->fast_path = true;
1019 }
1020#endif
1021
1022 return buffer;
1023}
#define InvalidBuffer
Definition buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition bufmgr.c:1742
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition bufmgr.c:1618
#define READ_BUFFERS_ISSUE_ADVICE
Definition bufmgr.h:124
static bool BufferIsValid(Buffer bufnum)
Definition bufmgr.h:421
#define likely(x)
Definition c.h:431
int32_t int32
Definition c.h:614
#define unlikely(x)
Definition c.h:432
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition memdebug.h:27
static void read_stream_look_ahead(ReadStream *stream)
int16 buffer_index
Definition read_stream.c:85
BlockNumber blocknum
Definition bufmgr.h:146
int16 oldest_buffer_index
int16 oldest_io_index
int16 initialized_buffers
int16 forwarded_buffers
Definition read_stream.c:99
int16 next_io_index

References ReadStream::advice_enabled, Assert, ReadBuffersOperation::blocknum, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, fb(), ReadStream::forwarded_buffers, get_per_buffer_data(), ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, ReadStream::io_combine_limit, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, read_stream_get_block(), read_stream_look_ahead(), ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffer(), unlikely, VALGRIND_MAKE_MEM_NOACCESS, and WaitReadBuffers().

Referenced by autoprewarm_database_main(), BitmapHeapScanNextBlock(), blbulkdelete(), blgetbitmap(), blvacuumcleanup(), brin_vacuum_scan(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), ginvacuumcleanup(), gistvacuumscan(), hashbulkdelete(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), pgstathashindex(), pgstatindex_impl(), read_stream_reset(), RelationCopyStorageUsingBuffer(), spgvacuumscan(), statapprox_heap(), and verify_heapam().

◆ read_stream_pause()

BlockNumber read_stream_pause ( ReadStream stream)

Definition at line 1045 of file read_stream.c.

1046{
1047 stream->resume_distance = stream->distance;
1048 stream->distance = 0;
1049 return InvalidBlockNumber;
1050}

References ReadStream::distance, InvalidBlockNumber, and ReadStream::resume_distance.

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 1070 of file read_stream.c.

1071{
1072 int16 index;
1073 Buffer buffer;
1074
1075 /* Stop looking ahead. */
1076 stream->distance = 0;
1077
1078 /* Forget buffered block number and fast path state. */
1080 stream->fast_path = false;
1081
1082 /* Unpin anything that wasn't consumed. */
1083 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1084 ReleaseBuffer(buffer);
1085
1086 /* Unpin any unused forwarded buffers. */
1087 index = stream->next_buffer_index;
1088 while (index < stream->initialized_buffers &&
1089 (buffer = stream->buffers[index]) != InvalidBuffer)
1090 {
1091 Assert(stream->forwarded_buffers > 0);
1092 stream->forwarded_buffers--;
1093 ReleaseBuffer(buffer);
1094
1095 stream->buffers[index] = InvalidBuffer;
1097 stream->buffers[stream->queue_size + index] = InvalidBuffer;
1098
1099 if (++index == stream->queue_size)
1100 index = 0;
1101 }
1102
1103 Assert(stream->forwarded_buffers == 0);
1104 Assert(stream->pinned_buffers == 0);
1105 Assert(stream->ios_in_progress == 0);
1106
1107 /* Start off assuming data is cached. */
1108 stream->distance = 1;
1109 stream->resume_distance = stream->distance;
1110}
void ReleaseBuffer(Buffer buffer)
Definition bufmgr.c:5505
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition type.h:96

References Assert, ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::distance, ReadStream::fast_path, fb(), ReadStream::forwarded_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios_in_progress, ReadStream::next_buffer_index, ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_next_buffer(), ReleaseBuffer(), and ReadStream::resume_distance.

Referenced by btvacuumscan(), gistvacuumscan(), hashbulkdelete(), heap_fetch_next_buffer(), heap_rescan(), read_stream_end(), and spgvacuumscan().

◆ read_stream_resume()

void read_stream_resume ( ReadStream stream)

Definition at line 1058 of file read_stream.c.

1059{
1060 stream->distance = stream->resume_distance;
1061}

References ReadStream::distance, and ReadStream::resume_distance.

◆ read_stream_start_pending_read()

static bool read_stream_start_pending_read ( ReadStream stream)
static

Definition at line 231 of file read_stream.c.

232{
233 bool need_wait;
235 int nblocks;
236 int flags;
237 int forwarded;
239 int16 overflow;
240 int16 buffer_index;
241 int buffer_limit;
242
243 /* This should only be called with a pending read. */
244 Assert(stream->pending_read_nblocks > 0);
245 Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
246
247 /* We had better not exceed the per-stream buffer limit with this read. */
248 Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
249 stream->max_pinned_buffers);
250
251#ifdef USE_ASSERT_CHECKING
252 /* We had better not be overwriting an existing pinned buffer. */
253 if (stream->pinned_buffers > 0)
254 Assert(stream->next_buffer_index != stream->oldest_buffer_index);
255 else
256 Assert(stream->next_buffer_index == stream->oldest_buffer_index);
257
258 /*
259 * Pinned buffers forwarded by a preceding StartReadBuffers() call that
260 * had to split the operation should match the leading blocks of this
261 * following StartReadBuffers() call.
262 */
264 for (int i = 0; i < stream->forwarded_buffers; ++i)
266 stream->pending_read_blocknum + i);
267
268 /*
269 * Check that we've cleared the queue/overflow entries corresponding to
270 * the rest of the blocks covered by this read, unless it's the first go
271 * around and we haven't even initialized them yet.
272 */
273 for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
274 Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
275 stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
276#endif
277
278 /* Do we need to issue read-ahead advice? */
279 flags = stream->read_buffers_flags;
280 if (stream->advice_enabled)
281 {
282 if (stream->pending_read_blocknum == stream->seq_blocknum)
283 {
284 /*
285 * Sequential: Issue advice until the preadv() calls have caught
286 * up with the first advice issued for this sequential region, and
287 * then stay out of the way of the kernel's own read-ahead.
288 */
291 }
292 else
293 {
294 /*
295 * Random jump: Note the starting location of a new potential
296 * sequential region and start issuing advice. Skip it this time
297 * if the preadv() follows immediately, eg first block in stream.
298 */
300 if (stream->pinned_buffers > 0)
302 }
303 }
304
305 /*
306 * How many more buffers is this backend allowed?
307 *
308 * Forwarded buffers are already pinned and map to the leading blocks of
309 * the pending read (the remaining portion of an earlier short read that
310 * we're about to continue). They are not counted in pinned_buffers, but
311 * they are counted as pins already held by this backend according to the
312 * buffer manager, so they must be added to the limit it grants us.
313 */
314 if (stream->temporary)
316 else
319
322
323 if (buffer_limit == 0 && stream->pinned_buffers == 0)
324 buffer_limit = 1; /* guarantee progress */
325
326 /* Does the per-backend limit affect this read? */
327 nblocks = stream->pending_read_nblocks;
328 if (buffer_limit < nblocks)
329 {
331
332 /* Shrink distance: no more look-ahead until buffers are released. */
334 if (stream->distance > new_distance)
335 stream->distance = new_distance;
336
337 /* Unless we have nothing to give the consumer, stop here. */
338 if (stream->pinned_buffers > 0)
339 return false;
340
341 /* A short read is required to make progress. */
342 nblocks = buffer_limit;
343 }
344
345 /*
346 * We say how many blocks we want to read, but it may be smaller on return
347 * if the buffer manager decides to shorten the read. Initialize buffers
348 * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
349 * and keep the original nblocks number so we can check for forwarded
350 * buffers as output, below.
351 */
352 buffer_index = stream->next_buffer_index;
353 io_index = stream->next_io_index;
354 while (stream->initialized_buffers < buffer_index + nblocks)
355 stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
356 requested_nblocks = nblocks;
358 &stream->buffers[buffer_index],
359 stream->pending_read_blocknum,
360 &nblocks,
361 flags);
362 stream->pinned_buffers += nblocks;
363
364 /* Remember whether we need to wait before returning this buffer. */
365 if (!need_wait)
366 {
367 /* Look-ahead distance decays, no I/O necessary. */
368 if (stream->distance > 1)
369 stream->distance--;
370 }
371 else
372 {
373 /*
374 * Remember to call WaitReadBuffers() before returning head buffer.
375 * Look-ahead distance will be adjusted after waiting.
376 */
377 stream->ios[io_index].buffer_index = buffer_index;
378 if (++stream->next_io_index == stream->max_ios)
379 stream->next_io_index = 0;
380 Assert(stream->ios_in_progress < stream->max_ios);
381 stream->ios_in_progress++;
382 stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
383 }
384
385 /*
386 * How many pins were acquired but forwarded to the next call? These need
387 * to be passed to the next StartReadBuffers() call by leaving them
388 * exactly where they are in the queue, or released if the stream ends
389 * early. We need the number for accounting purposes, since they are not
390 * counted in stream->pinned_buffers but we already hold them.
391 */
392 forwarded = 0;
393 while (nblocks + forwarded < requested_nblocks &&
394 stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
395 forwarded++;
397
398 /*
399 * We gave a contiguous range of buffer space to StartReadBuffers(), but
400 * we want it to wrap around at queue_size. Copy overflowing buffers to
401 * the front of the array where they'll be consumed, but also leave a copy
402 * in the overflow zone which the I/O operation has a pointer to (it needs
403 * a contiguous array). Both copies will be cleared when the buffers are
404 * handed to the consumer.
405 */
406 overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
407 if (overflow > 0)
408 {
409 Assert(overflow < stream->queue_size); /* can't overlap */
410 memcpy(&stream->buffers[0],
411 &stream->buffers[stream->queue_size],
412 sizeof(stream->buffers[0]) * overflow);
413 }
414
415 /* Compute location of start of next read, without using % operator. */
416 buffer_index += nblocks;
417 if (buffer_index >= stream->queue_size)
418 buffer_index -= stream->queue_size;
419 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
420 stream->next_buffer_index = buffer_index;
421
422 /* Adjust the pending read to cover the remaining portion, if any. */
423 stream->pending_read_blocknum += nblocks;
424 stream->pending_read_nblocks -= nblocks;
425
426 return true;
427}
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition bufmgr.c:4357
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition bufmgr.c:1599
uint32 GetAdditionalPinLimit(void)
Definition bufmgr.c:2621
uint32 GetAdditionalLocalPinLimit(void)
Definition localbuf.c:315

References ReadStream::advice_enabled, Assert, InProgressIO::buffer_index, BufferGetBlockNumber(), ReadStream::buffers, ReadStream::distance, fb(), ReadStream::forwarded_buffers, GetAdditionalLocalPinLimit(), GetAdditionalPinLimit(), i, ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, ReadStream::io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, InProgressIO::op, ReadStream::pending_read_blocknum, ReadStream::pending_read_nblocks, PG_INT16_MAX, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffers(), and ReadStream::temporary.

Referenced by read_stream_look_ahead().

◆ read_stream_unget_block()

static void read_stream_unget_block ( ReadStream stream,
BlockNumber  blocknum 
)
inlinestatic

Definition at line 212 of file read_stream.c.

213{
214 /* We shouldn't ever unget more than one block. */
216 Assert(blocknum != InvalidBlockNumber);
217 stream->buffered_blocknum = blocknum;
218}

References Assert, ReadStream::buffered_blocknum, and InvalidBlockNumber.

Referenced by read_stream_look_ahead().