PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
read_stream.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "storage/read_stream.h"
#include "utils/memdebug.h"
#include "utils/rel.h"
#include "utils/spccache.h"
Include dependency graph for read_stream.c:

Go to the source code of this file.

Data Structures

struct  InProgressIO
 
struct  ReadStream
 

Typedefs

typedef struct InProgressIO InProgressIO
 

Functions

static void * get_per_buffer_data (ReadStream *stream, int16 buffer_index)
 
BlockNumber block_range_read_stream_cb (ReadStream *stream, void *callback_private_data, void *per_buffer_data)
 
static BlockNumber read_stream_get_block (ReadStream *stream, void *per_buffer_data)
 
static void read_stream_unget_block (ReadStream *stream, BlockNumber blocknum)
 
static bool read_stream_start_pending_read (ReadStream *stream)
 
static void read_stream_look_ahead (ReadStream *stream)
 
static ReadStreamread_stream_begin_impl (int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
ReadStreamread_stream_begin_relation (int flags, BufferAccessStrategy strategy, Relation rel, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
ReadStreamread_stream_begin_smgr_relation (int flags, BufferAccessStrategy strategy, SMgrRelation smgr, char smgr_persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
 
Buffer read_stream_next_buffer (ReadStream *stream, void **per_buffer_data)
 
BlockNumber read_stream_next_block (ReadStream *stream, BufferAccessStrategy *strategy)
 
void read_stream_reset (ReadStream *stream)
 
void read_stream_end (ReadStream *stream)
 

Typedef Documentation

◆ InProgressIO

typedef struct InProgressIO InProgressIO

Function Documentation

◆ block_range_read_stream_cb()

BlockNumber block_range_read_stream_cb ( ReadStream stream,
void *  callback_private_data,
void *  per_buffer_data 
)

◆ get_per_buffer_data()

static void * get_per_buffer_data ( ReadStream stream,
int16  buffer_index 
)
inlinestatic

Definition at line 151 of file read_stream.c.

152{
153 return (char *) stream->per_buffer_data +
154 stream->per_buffer_data_size * buffer_index;
155}
void * per_buffer_data
Definition: read_stream.c:132
size_t per_buffer_data_size
Definition: read_stream.c:131

References ReadStream::per_buffer_data, and ReadStream::per_buffer_data_size.

Referenced by read_stream_look_ahead(), and read_stream_next_buffer().

◆ read_stream_begin_impl()

static ReadStream * read_stream_begin_impl ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
SMgrRelation  smgr,
char  persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)
static

Definition at line 517 of file read_stream.c.

526{
527 ReadStream *stream;
528 size_t size;
529 int16 queue_size;
530 int16 queue_overflow;
531 int max_ios;
532 int strategy_pin_limit;
533 uint32 max_pinned_buffers;
534 uint32 max_possible_buffer_limit;
535 Oid tablespace_id;
536
537 /*
538 * Decide how many I/Os we will allow to run at the same time. That
539 * currently means advice to the kernel to tell it that we will soon read.
540 * This number also affects how far we look ahead for opportunities to
541 * start more I/Os.
542 */
543 tablespace_id = smgr->smgr_rlocator.locator.spcOid;
544 if (!OidIsValid(MyDatabaseId) ||
545 (rel && IsCatalogRelation(rel)) ||
547 {
548 /*
549 * Avoid circularity while trying to look up tablespace settings or
550 * before spccache.c is ready.
551 */
552 max_ios = effective_io_concurrency;
553 }
554 else if (flags & READ_STREAM_MAINTENANCE)
555 max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
556 else
557 max_ios = get_tablespace_io_concurrency(tablespace_id);
558
559 /* Cap to INT16_MAX to avoid overflowing below */
560 max_ios = Min(max_ios, PG_INT16_MAX);
561
562 /*
563 * If starting a multi-block I/O near the end of the queue, we might
564 * temporarily need extra space for overflowing buffers before they are
565 * moved to regular circular position. This is the maximum extra space we
566 * could need.
567 */
568 queue_overflow = io_combine_limit - 1;
569
570 /*
571 * Choose the maximum number of buffers we're prepared to pin. We try to
572 * pin fewer if we can, though. We add one so that we can make progress
573 * even if max_ios is set to 0 (see also further down). For max_ios > 0,
574 * this also allows an extra full I/O's worth of buffers: after an I/O
575 * finishes we don't want to have to wait for its buffers to be consumed
576 * before starting a new one.
577 *
578 * Be careful not to allow int16 to overflow. That is possible with the
579 * current GUC range limits, so this is an artificial limit of ~32k
580 * buffers and we'd need to adjust the types to exceed that. We also have
581 * to allow for the spare entry and the overflow space.
582 */
583 max_pinned_buffers = (max_ios + 1) * io_combine_limit;
584 max_pinned_buffers = Min(max_pinned_buffers,
585 PG_INT16_MAX - queue_overflow - 1);
586
587 /* Give the strategy a chance to limit the number of buffers we pin. */
588 strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
589 max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
590
591 /*
592 * Also limit our queue to the maximum number of pins we could ever be
593 * allowed to acquire according to the buffer manager. We may not really
594 * be able to use them all due to other pins held by this backend, but
595 * we'll check that later in read_stream_start_pending_read().
596 */
597 if (SmgrIsTemp(smgr))
598 max_possible_buffer_limit = GetLocalPinLimit();
599 else
600 max_possible_buffer_limit = GetPinLimit();
601 max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
602
603 /*
604 * The limit might be zero on a system configured with too few buffers for
605 * the number of connections. We need at least one to make progress.
606 */
607 max_pinned_buffers = Max(1, max_pinned_buffers);
608
609 /*
610 * We need one extra entry for buffers and per-buffer data, because users
611 * of per-buffer data have access to the object until the next call to
612 * read_stream_next_buffer(), so we need a gap between the head and tail
613 * of the queue so that we don't clobber it.
614 */
615 queue_size = max_pinned_buffers + 1;
616
617 /*
618 * Allocate the object, the buffers, the ios and per_buffer_data space in
619 * one big chunk. Though we have queue_size buffers, we want to be able
620 * to assume that all the buffers for a single read are contiguous (i.e.
621 * don't wrap around halfway through), so we allow temporary overflows of
622 * up to the maximum possible overflow size.
623 */
624 size = offsetof(ReadStream, buffers);
625 size += sizeof(Buffer) * (queue_size + queue_overflow);
626 size += sizeof(InProgressIO) * Max(1, max_ios);
627 size += per_buffer_data_size * queue_size;
628 size += MAXIMUM_ALIGNOF * 2;
629 stream = (ReadStream *) palloc(size);
630 memset(stream, 0, offsetof(ReadStream, buffers));
631 stream->ios = (InProgressIO *)
632 MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
633 if (per_buffer_data_size > 0)
634 stream->per_buffer_data = (void *)
635 MAXALIGN(&stream->ios[Max(1, max_ios)]);
636
637 stream->sync_mode = io_method == IOMETHOD_SYNC;
638 stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
639
640#ifdef USE_PREFETCH
641
642 /*
643 * Read-ahead advice simulating asynchronous I/O with synchronous calls.
644 * Issue advice only if AIO is not used, direct I/O isn't enabled, the
645 * caller hasn't promised sequential access (overriding our detection
646 * heuristics), and max_ios hasn't been set to zero.
647 */
648 if (stream->sync_mode &&
650 (flags & READ_STREAM_SEQUENTIAL) == 0 &&
651 max_ios > 0)
652 stream->advice_enabled = true;
653#endif
654
655 /*
656 * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
657 * we still need to allocate space to combine and run one I/O. Bump it up
658 * to one, and remember to ask for synchronous I/O only.
659 */
660 if (max_ios == 0)
661 {
662 max_ios = 1;
664 }
665
666 /*
667 * Capture stable values for these two GUC-derived numbers for the
668 * lifetime of this stream, so we don't have to worry about the GUCs
669 * changing underneath us beyond this point.
670 */
671 stream->max_ios = max_ios;
673
674 stream->per_buffer_data_size = per_buffer_data_size;
675 stream->max_pinned_buffers = max_pinned_buffers;
676 stream->queue_size = queue_size;
677 stream->callback = callback;
678 stream->callback_private_data = callback_private_data;
682 stream->temporary = SmgrIsTemp(smgr);
683
684 /*
685 * Skip the initial ramp-up phase if the caller says we're going to be
686 * reading the whole relation. This way we start out assuming we'll be
687 * doing full io_combine_limit sized reads.
688 */
689 if (flags & READ_STREAM_FULL)
690 stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
691 else
692 stream->distance = 1;
693
694 /*
695 * Since we always access the same relation, we can initialize parts of
696 * the ReadBuffersOperation objects and leave them that way, to avoid
697 * wasting CPU cycles writing to them for each read.
698 */
699 for (int i = 0; i < max_ios; ++i)
700 {
701 stream->ios[i].op.rel = rel;
702 stream->ios[i].op.smgr = smgr;
703 stream->ios[i].op.persistence = persistence;
704 stream->ios[i].op.forknum = forknum;
705 stream->ios[i].op.strategy = strategy;
706 }
707
708 return stream;
709}
int io_method
Definition: aio.c:77
@ IOMETHOD_SYNC
Definition: aio.h:34
int Buffer
Definition: buf.h:23
int effective_io_concurrency
Definition: bufmgr.c:155
int io_combine_limit
Definition: bufmgr.c:170
uint32 GetPinLimit(void)
Definition: bufmgr.c:2510
#define READ_BUFFERS_SYNCHRONOUSLY
Definition: bufmgr.h:118
#define Min(x, y)
Definition: c.h:975
#define MAXALIGN(LEN)
Definition: c.h:782
#define Max(x, y)
Definition: c.h:969
int16_t int16
Definition: c.h:497
uint32_t uint32
Definition: c.h:502
#define PG_INT16_MAX
Definition: c.h:557
#define OidIsValid(objectId)
Definition: c.h:746
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:104
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:121
int io_direct_flags
Definition: fd.c:168
#define IO_DIRECT_DATA
Definition: fd.h:54
int GetAccessStrategyPinLimit(BufferAccessStrategy strategy)
Definition: freelist.c:689
Oid MyDatabaseId
Definition: globals.c:95
int i
Definition: isn.c:77
uint32 GetLocalPinLimit(void)
Definition: localbuf.c:306
void * palloc(Size size)
Definition: mcxt.c:1939
unsigned int Oid
Definition: postgres_ext.h:30
struct InProgressIO InProgressIO
#define READ_STREAM_MAINTENANCE
Definition: read_stream.h:28
#define READ_STREAM_USE_BATCHING
Definition: read_stream.h:64
#define READ_STREAM_FULL
Definition: read_stream.h:43
#define READ_STREAM_SEQUENTIAL
Definition: read_stream.h:36
#define SmgrIsTemp(smgr)
Definition: smgr.h:74
int get_tablespace_io_concurrency(Oid spcid)
Definition: spccache.c:215
int get_tablespace_maintenance_io_concurrency(Oid spcid)
Definition: spccache.c:229
ReadBuffersOperation op
Definition: read_stream.c:86
ForkNumber forknum
Definition: bufmgr.h:127
BufferAccessStrategy strategy
Definition: bufmgr.h:128
struct SMgrRelationData * smgr
Definition: bufmgr.h:125
int16 io_combine_limit
Definition: read_stream.c:95
int16 distance
Definition: read_stream.c:101
BlockNumber seq_until_processed
Definition: read_stream.c:124
int16 max_ios
Definition: read_stream.c:94
BlockNumber seq_blocknum
Definition: read_stream.c:123
bool temporary
Definition: read_stream.c:107
bool batch_mode
Definition: read_stream.c:105
bool advice_enabled
Definition: read_stream.c:106
int16 max_pinned_buffers
Definition: read_stream.c:98
InProgressIO * ios
Definition: read_stream.c:135
int read_buffers_flags
Definition: read_stream.c:103
BlockNumber buffered_blocknum
Definition: read_stream.c:113
int16 queue_size
Definition: read_stream.c:97
bool sync_mode
Definition: read_stream.c:104
ReadStreamBlockNumberCB callback
Definition: read_stream.c:119
void * callback_private_data
Definition: read_stream.c:120
Buffer buffers[FLEXIBLE_ARRAY_MEMBER]
Definition: read_stream.c:144
RelFileLocator locator
RelFileNumber relNumber
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:38
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46

References ReadStream::advice_enabled, ReadStream::batch_mode, ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::callback, callback(), ReadStream::callback_private_data, ReadStream::distance, effective_io_concurrency, ReadBuffersOperation::forknum, get_tablespace_io_concurrency(), get_tablespace_maintenance_io_concurrency(), GetAccessStrategyPinLimit(), GetLocalPinLimit(), GetPinLimit(), i, InvalidBlockNumber, ReadStream::io_combine_limit, io_combine_limit, IO_DIRECT_DATA, io_direct_flags, io_method, IOMETHOD_SYNC, ReadStream::ios, IsCatalogRelation(), IsCatalogRelationOid(), RelFileLocatorBackend::locator, Max, ReadStream::max_ios, ReadStream::max_pinned_buffers, MAXALIGN, Min, MyDatabaseId, OidIsValid, InProgressIO::op, palloc(), ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadBuffersOperation::persistence, PG_INT16_MAX, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_SYNCHRONOUSLY, READ_STREAM_FULL, READ_STREAM_MAINTENANCE, READ_STREAM_SEQUENTIAL, READ_STREAM_USE_BATCHING, ReadBuffersOperation::rel, RelFileLocator::relNumber, ReadStream::seq_blocknum, ReadStream::seq_until_processed, ReadBuffersOperation::smgr, SMgrRelationData::smgr_rlocator, SmgrIsTemp, RelFileLocator::spcOid, ReadBuffersOperation::strategy, ReadStream::sync_mode, and ReadStream::temporary.

Referenced by read_stream_begin_relation(), and read_stream_begin_smgr_relation().

◆ read_stream_begin_relation()

ReadStream * read_stream_begin_relation ( int  flags,
BufferAccessStrategy  strategy,
Relation  rel,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 716 of file read_stream.c.

723{
724 return read_stream_begin_impl(flags,
725 strategy,
726 rel,
727 RelationGetSmgr(rel),
728 rel->rd_rel->relpersistence,
729 forknum,
730 callback,
731 callback_private_data,
732 per_buffer_data_size);
733}
static ReadStream * read_stream_begin_impl(int flags, BufferAccessStrategy strategy, Relation rel, SMgrRelation smgr, char persistence, ForkNumber forknum, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size)
Definition: read_stream.c:517
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:578
Form_pg_class rd_rel
Definition: rel.h:111

References callback(), RelationData::rd_rel, read_stream_begin_impl(), and RelationGetSmgr().

Referenced by acquire_sample_rows(), autoprewarm_database_main(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), gistvacuumscan(), heap_beginscan(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), spgvacuumscan(), and verify_heapam().

◆ read_stream_begin_smgr_relation()

ReadStream * read_stream_begin_smgr_relation ( int  flags,
BufferAccessStrategy  strategy,
SMgrRelation  smgr,
char  smgr_persistence,
ForkNumber  forknum,
ReadStreamBlockNumberCB  callback,
void *  callback_private_data,
size_t  per_buffer_data_size 
)

Definition at line 740 of file read_stream.c.

748{
749 return read_stream_begin_impl(flags,
750 strategy,
751 NULL,
752 smgr,
753 smgr_persistence,
754 forknum,
755 callback,
756 callback_private_data,
757 per_buffer_data_size);
758}

References callback(), and read_stream_begin_impl().

Referenced by RelationCopyStorageUsingBuffer().

◆ read_stream_end()

void read_stream_end ( ReadStream stream)

◆ read_stream_get_block()

static BlockNumber read_stream_get_block ( ReadStream stream,
void *  per_buffer_data 
)
inlinestatic

Definition at line 179 of file read_stream.c.

180{
181 BlockNumber blocknum;
182
183 blocknum = stream->buffered_blocknum;
184 if (blocknum != InvalidBlockNumber)
186 else
187 {
188 /*
189 * Tell Valgrind that the per-buffer data is undefined. That replaces
190 * the "noaccess" state that was set when the consumer moved past this
191 * entry last time around the queue, and should also catch callbacks
192 * that fail to initialize data that the buffer consumer later
193 * accesses. On the first go around, it is undefined already.
194 */
195 VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
196 stream->per_buffer_data_size);
197 blocknum = stream->callback(stream,
198 stream->callback_private_data,
199 per_buffer_data);
200 }
201
202 return blocknum;
203}
uint32 BlockNumber
Definition: block.h:31
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28

References ReadStream::buffered_blocknum, ReadStream::callback, ReadStream::callback_private_data, InvalidBlockNumber, ReadStream::per_buffer_data_size, and VALGRIND_MAKE_MEM_UNDEFINED.

Referenced by read_stream_look_ahead(), read_stream_next_block(), and read_stream_next_buffer().

◆ read_stream_look_ahead()

static void read_stream_look_ahead ( ReadStream stream)
static

Definition at line 408 of file read_stream.c.

409{
410 /*
411 * Allow amortizing the cost of submitting IO over multiple IOs. This
412 * requires that we don't do any operations that could lead to a deadlock
413 * with staged-but-unsubmitted IO. The callback needs to opt-in to being
414 * careful.
415 */
416 if (stream->batch_mode)
418
419 while (stream->ios_in_progress < stream->max_ios &&
420 stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
421 {
422 BlockNumber blocknum;
423 int16 buffer_index;
424 void *per_buffer_data;
425
426 if (stream->pending_read_nblocks == stream->io_combine_limit)
427 {
429 continue;
430 }
431
432 /*
433 * See which block the callback wants next in the stream. We need to
434 * compute the index of the Nth block of the pending read including
435 * wrap-around, but we don't want to use the expensive % operator.
436 */
437 buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
438 if (buffer_index >= stream->queue_size)
439 buffer_index -= stream->queue_size;
440 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
441 per_buffer_data = get_per_buffer_data(stream, buffer_index);
442 blocknum = read_stream_get_block(stream, per_buffer_data);
443 if (blocknum == InvalidBlockNumber)
444 {
445 /* End of stream. */
446 stream->distance = 0;
447 break;
448 }
449
450 /* Can we merge it with the pending read? */
451 if (stream->pending_read_nblocks > 0 &&
452 stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
453 {
454 stream->pending_read_nblocks++;
455 continue;
456 }
457
458 /* We have to start the pending read before we can build another. */
459 while (stream->pending_read_nblocks > 0)
460 {
461 if (!read_stream_start_pending_read(stream) ||
462 stream->ios_in_progress == stream->max_ios)
463 {
464 /* We've hit the buffer or I/O limit. Rewind and stop here. */
465 read_stream_unget_block(stream, blocknum);
466 if (stream->batch_mode)
468 return;
469 }
470 }
471
472 /* This is the start of a new pending read. */
473 stream->pending_read_blocknum = blocknum;
474 stream->pending_read_nblocks = 1;
475 }
476
477 /*
478 * We don't start the pending read just because we've hit the distance
479 * limit, preferring to give it another chance to grow to full
480 * io_combine_limit size once more buffers have been consumed. However,
481 * if we've already reached io_combine_limit, or we've reached the
482 * distance limit and there isn't anything pinned yet, or the callback has
483 * signaled end-of-stream, we start the read immediately. Note that the
484 * pending read can exceed the distance goal, if the latter was reduced
485 * after hitting the per-backend buffer limit.
486 */
487 if (stream->pending_read_nblocks > 0 &&
488 (stream->pending_read_nblocks == stream->io_combine_limit ||
489 (stream->pending_read_nblocks >= stream->distance &&
490 stream->pinned_buffers == 0) ||
491 stream->distance == 0) &&
492 stream->ios_in_progress < stream->max_ios)
494
495 /*
496 * There should always be something pinned when we leave this function,
497 * whether started by this call or not, unless we've hit the end of the
498 * stream. In the worst case we can always make progress one buffer at a
499 * time.
500 */
501 Assert(stream->pinned_buffers > 0 || stream->distance == 0);
502
503 if (stream->batch_mode)
505}
void pgaio_enter_batchmode(void)
Definition: aio.c:978
void pgaio_exit_batchmode(void)
Definition: aio.c:989
Assert(PointerIsAligned(start, uint64))
static void * get_per_buffer_data(ReadStream *stream, int16 buffer_index)
Definition: read_stream.c:151
static bool read_stream_start_pending_read(ReadStream *stream)
Definition: read_stream.c:230
static BlockNumber read_stream_get_block(ReadStream *stream, void *per_buffer_data)
Definition: read_stream.c:179
static void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
Definition: read_stream.c:211
int16 ios_in_progress
Definition: read_stream.c:96
int16 pinned_buffers
Definition: read_stream.c:100
BlockNumber pending_read_blocknum
Definition: read_stream.c:127
int16 next_buffer_index
Definition: read_stream.c:143
int16 pending_read_nblocks
Definition: read_stream.c:128

References Assert(), ReadStream::batch_mode, ReadStream::distance, get_per_buffer_data(), InvalidBlockNumber, ReadStream::io_combine_limit, ReadStream::ios_in_progress, ReadStream::max_ios, ReadStream::next_buffer_index, ReadStream::pending_read_blocknum, ReadStream::pending_read_nblocks, pgaio_enter_batchmode(), pgaio_exit_batchmode(), ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_get_block(), read_stream_start_pending_read(), and read_stream_unget_block().

Referenced by read_stream_next_buffer().

◆ read_stream_next_block()

BlockNumber read_stream_next_block ( ReadStream stream,
BufferAccessStrategy strategy 
)

Definition at line 997 of file read_stream.c.

998{
999 *strategy = stream->ios[0].op.strategy;
1000 return read_stream_get_block(stream, NULL);
1001}

References ReadStream::ios, InProgressIO::op, read_stream_get_block(), and ReadBuffersOperation::strategy.

◆ read_stream_next_buffer()

Buffer read_stream_next_buffer ( ReadStream stream,
void **  per_buffer_data 
)

Definition at line 770 of file read_stream.c.

771{
772 Buffer buffer;
773 int16 oldest_buffer_index;
774
775#ifndef READ_STREAM_DISABLE_FAST_PATH
776
777 /*
778 * A fast path for all-cached scans. This is the same as the usual
779 * algorithm, but it is specialized for no I/O and no per-buffer data, so
780 * we can skip the queue management code, stay in the same buffer slot and
781 * use singular StartReadBuffer().
782 */
783 if (likely(stream->fast_path))
784 {
785 BlockNumber next_blocknum;
786
787 /* Fast path assumptions. */
788 Assert(stream->ios_in_progress == 0);
789 Assert(stream->forwarded_buffers == 0);
790 Assert(stream->pinned_buffers == 1);
791 Assert(stream->distance == 1);
792 Assert(stream->pending_read_nblocks == 0);
793 Assert(stream->per_buffer_data_size == 0);
795
796 /* We're going to return the buffer we pinned last time. */
797 oldest_buffer_index = stream->oldest_buffer_index;
798 Assert((oldest_buffer_index + 1) % stream->queue_size ==
799 stream->next_buffer_index);
800 buffer = stream->buffers[oldest_buffer_index];
801 Assert(buffer != InvalidBuffer);
802
803 /* Choose the next block to pin. */
804 next_blocknum = read_stream_get_block(stream, NULL);
805
806 if (likely(next_blocknum != InvalidBlockNumber))
807 {
808 int flags = stream->read_buffers_flags;
809
810 if (stream->advice_enabled)
812
813 /*
814 * Pin a buffer for the next call. Same buffer entry, and
815 * arbitrary I/O entry (they're all free). We don't have to
816 * adjust pinned_buffers because we're transferring one to caller
817 * but pinning one more.
818 *
819 * In the fast path we don't need to check the pin limit. We're
820 * always allowed at least one pin so that progress can be made,
821 * and that's all we need here. Although two pins are momentarily
822 * held at the same time, the model used here is that the stream
823 * holds only one, and the other now belongs to the caller.
824 */
825 if (likely(!StartReadBuffer(&stream->ios[0].op,
826 &stream->buffers[oldest_buffer_index],
827 next_blocknum,
828 flags)))
829 {
830 /* Fast return. */
831 return buffer;
832 }
833
834 /* Next call must wait for I/O for the newly pinned buffer. */
835 stream->oldest_io_index = 0;
836 stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
837 stream->ios_in_progress = 1;
838 stream->ios[0].buffer_index = oldest_buffer_index;
839 stream->seq_blocknum = next_blocknum + 1;
840 }
841 else
842 {
843 /* No more blocks, end of stream. */
844 stream->distance = 0;
845 stream->oldest_buffer_index = stream->next_buffer_index;
846 stream->pinned_buffers = 0;
847 stream->buffers[oldest_buffer_index] = InvalidBuffer;
848 }
849
850 stream->fast_path = false;
851 return buffer;
852 }
853#endif
854
855 if (unlikely(stream->pinned_buffers == 0))
856 {
857 Assert(stream->oldest_buffer_index == stream->next_buffer_index);
858
859 /* End of stream reached? */
860 if (stream->distance == 0)
861 return InvalidBuffer;
862
863 /*
864 * The usual order of operations is that we look ahead at the bottom
865 * of this function after potentially finishing an I/O and making
866 * space for more, but if we're just starting up we'll need to crank
867 * the handle to get started.
868 */
870
871 /* End of stream reached? */
872 if (stream->pinned_buffers == 0)
873 {
874 Assert(stream->distance == 0);
875 return InvalidBuffer;
876 }
877 }
878
879 /* Grab the oldest pinned buffer and associated per-buffer data. */
880 Assert(stream->pinned_buffers > 0);
881 oldest_buffer_index = stream->oldest_buffer_index;
882 Assert(oldest_buffer_index >= 0 &&
883 oldest_buffer_index < stream->queue_size);
884 buffer = stream->buffers[oldest_buffer_index];
885 if (per_buffer_data)
886 *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
887
888 Assert(BufferIsValid(buffer));
889
890 /* Do we have to wait for an associated I/O first? */
891 if (stream->ios_in_progress > 0 &&
892 stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
893 {
894 int16 io_index = stream->oldest_io_index;
895 int32 distance; /* wider temporary value, clamped below */
896
897 /* Sanity check that we still agree on the buffers. */
898 Assert(stream->ios[io_index].op.buffers ==
899 &stream->buffers[oldest_buffer_index]);
900
901 WaitReadBuffers(&stream->ios[io_index].op);
902
903 Assert(stream->ios_in_progress > 0);
904 stream->ios_in_progress--;
905 if (++stream->oldest_io_index == stream->max_ios)
906 stream->oldest_io_index = 0;
907
908 /* Look-ahead distance ramps up rapidly after we do I/O. */
909 distance = stream->distance * 2;
910 distance = Min(distance, stream->max_pinned_buffers);
911 stream->distance = distance;
912
913 /*
914 * If we've reached the first block of a sequential region we're
915 * issuing advice for, cancel that until the next jump. The kernel
916 * will see the sequential preadv() pattern starting here.
917 */
918 if (stream->advice_enabled &&
919 stream->ios[io_index].op.blocknum == stream->seq_until_processed)
921 }
922
923 /*
924 * We must zap this queue entry, or else it would appear as a forwarded
925 * buffer. If it's potentially in the overflow zone (ie from a
926 * multi-block I/O that wrapped around the queue), also zap the copy.
927 */
928 stream->buffers[oldest_buffer_index] = InvalidBuffer;
929 if (oldest_buffer_index < stream->io_combine_limit - 1)
930 stream->buffers[stream->queue_size + oldest_buffer_index] =
932
933#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
934
935 /*
936 * The caller will get access to the per-buffer data, until the next call.
937 * We wipe the one before, which is never occupied because queue_size
938 * allowed one extra element. This will hopefully trip up client code
939 * that is holding a dangling pointer to it.
940 */
941 if (stream->per_buffer_data)
942 {
943 void *per_buffer_data;
944
945 per_buffer_data = get_per_buffer_data(stream,
946 oldest_buffer_index == 0 ?
947 stream->queue_size - 1 :
948 oldest_buffer_index - 1);
949
950#if defined(CLOBBER_FREED_MEMORY)
951 /* This also tells Valgrind the memory is "noaccess". */
952 wipe_mem(per_buffer_data, stream->per_buffer_data_size);
953#elif defined(USE_VALGRIND)
954 /* Tell it ourselves. */
955 VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
956 stream->per_buffer_data_size);
957#endif
958 }
959#endif
960
961 /* Pin transferred to caller. */
962 Assert(stream->pinned_buffers > 0);
963 stream->pinned_buffers--;
964
965 /* Advance oldest buffer, with wrap-around. */
966 stream->oldest_buffer_index++;
967 if (stream->oldest_buffer_index == stream->queue_size)
968 stream->oldest_buffer_index = 0;
969
970 /* Prepare for the next call. */
972
973#ifndef READ_STREAM_DISABLE_FAST_PATH
974 /* See if we can take the fast path for all-cached scans next time. */
975 if (stream->ios_in_progress == 0 &&
976 stream->forwarded_buffers == 0 &&
977 stream->pinned_buffers == 1 &&
978 stream->distance == 1 &&
979 stream->pending_read_nblocks == 0 &&
980 stream->per_buffer_data_size == 0)
981 {
982 stream->fast_path = true;
983 }
984#endif
985
986 return buffer;
987}
#define InvalidBuffer
Definition: buf.h:25
void WaitReadBuffers(ReadBuffersOperation *operation)
Definition: bufmgr.c:1637
bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, int flags)
Definition: bufmgr.c:1513
#define READ_BUFFERS_ISSUE_ADVICE
Definition: bufmgr.h:114
static bool BufferIsValid(Buffer bufnum)
Definition: bufmgr.h:368
#define likely(x)
Definition: c.h:346
int32_t int32
Definition: c.h:498
#define unlikely(x)
Definition: c.h:347
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size)
Definition: memdebug.h:27
static void read_stream_look_ahead(ReadStream *stream)
Definition: read_stream.c:408
int16 buffer_index
Definition: read_stream.c:85
Buffer * buffers
Definition: bufmgr.h:135
BlockNumber blocknum
Definition: bufmgr.h:136
int16 oldest_buffer_index
Definition: read_stream.c:142
int16 oldest_io_index
Definition: read_stream.c:136
int16 initialized_buffers
Definition: read_stream.c:102
int16 forwarded_buffers
Definition: read_stream.c:99
int16 next_io_index
Definition: read_stream.c:137
bool fast_path
Definition: read_stream.c:139

References ReadStream::advice_enabled, Assert(), ReadBuffersOperation::blocknum, InProgressIO::buffer_index, BufferIsValid(), ReadStream::buffers, ReadBuffersOperation::buffers, ReadStream::distance, ReadStream::fast_path, ReadStream::forwarded_buffers, get_per_buffer_data(), ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, likely, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, ReadStream::oldest_io_index, InProgressIO::op, ReadStream::pending_read_nblocks, ReadStream::per_buffer_data, ReadStream::per_buffer_data_size, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, read_stream_get_block(), read_stream_look_ahead(), ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffer(), unlikely, VALGRIND_MAKE_MEM_NOACCESS, and WaitReadBuffers().

Referenced by autoprewarm_database_main(), BitmapHeapScanNextBlock(), btvacuumscan(), collect_corrupt_items(), collect_visibility_data(), gistvacuumscan(), heap_fetch_next_buffer(), heapam_scan_analyze_next_block(), lazy_scan_heap(), lazy_vacuum_heap_rel(), pg_prewarm(), read_stream_reset(), RelationCopyStorageUsingBuffer(), spgvacuumscan(), and verify_heapam().

◆ read_stream_reset()

void read_stream_reset ( ReadStream stream)

Definition at line 1010 of file read_stream.c.

1011{
1012 int16 index;
1013 Buffer buffer;
1014
1015 /* Stop looking ahead. */
1016 stream->distance = 0;
1017
1018 /* Forget buffered block number and fast path state. */
1020 stream->fast_path = false;
1021
1022 /* Unpin anything that wasn't consumed. */
1023 while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
1024 ReleaseBuffer(buffer);
1025
1026 /* Unpin any unused forwarded buffers. */
1027 index = stream->next_buffer_index;
1028 while (index < stream->initialized_buffers &&
1029 (buffer = stream->buffers[index]) != InvalidBuffer)
1030 {
1031 Assert(stream->forwarded_buffers > 0);
1032 stream->forwarded_buffers--;
1033 ReleaseBuffer(buffer);
1034
1035 stream->buffers[index] = InvalidBuffer;
1037 stream->buffers[stream->queue_size + index] = InvalidBuffer;
1038
1039 if (++index == stream->queue_size)
1040 index = 0;
1041 }
1042
1043 Assert(stream->forwarded_buffers == 0);
1044 Assert(stream->pinned_buffers == 0);
1045 Assert(stream->ios_in_progress == 0);
1046
1047 /* Start off assuming data is cached. */
1048 stream->distance = 1;
1049}
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:5373
Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Definition: read_stream.c:770
Definition: type.h:96

References Assert(), ReadStream::buffered_blocknum, ReadStream::buffers, ReadStream::distance, ReadStream::fast_path, ReadStream::forwarded_buffers, InvalidBlockNumber, InvalidBuffer, io_combine_limit, ReadStream::ios_in_progress, ReadStream::next_buffer_index, ReadStream::pinned_buffers, ReadStream::queue_size, read_stream_next_buffer(), and ReleaseBuffer().

Referenced by btvacuumscan(), gistvacuumscan(), heap_fetch_next_buffer(), heap_rescan(), read_stream_end(), and spgvacuumscan().

◆ read_stream_start_pending_read()

static bool read_stream_start_pending_read ( ReadStream stream)
static

Definition at line 230 of file read_stream.c.

231{
232 bool need_wait;
233 int requested_nblocks;
234 int nblocks;
235 int flags;
236 int forwarded;
237 int16 io_index;
238 int16 overflow;
239 int16 buffer_index;
240 int buffer_limit;
241
242 /* This should only be called with a pending read. */
243 Assert(stream->pending_read_nblocks > 0);
244 Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
245
246 /* We had better not exceed the per-stream buffer limit with this read. */
247 Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
248 stream->max_pinned_buffers);
249
250 /* We had better not be overwriting an existing pinned buffer. */
251 if (stream->pinned_buffers > 0)
252 Assert(stream->next_buffer_index != stream->oldest_buffer_index);
253 else
254 Assert(stream->next_buffer_index == stream->oldest_buffer_index);
255
256 /* Do we need to issue read-ahead advice? */
257 flags = stream->read_buffers_flags;
258 if (stream->advice_enabled)
259 {
260 if (stream->pending_read_blocknum == stream->seq_blocknum)
261 {
262 /*
263 * Sequential: Issue advice until the preadv() calls have caught
264 * up with the first advice issued for this sequential region, and
265 * then stay of the way of the kernel's own read-ahead.
266 */
269 }
270 else
271 {
272 /*
273 * Random jump: Note the starting location of a new potential
274 * sequential region and start issuing advice. Skip it this time
275 * if the preadv() follows immediately, eg first block in stream.
276 */
278 if (stream->pinned_buffers > 0)
280 }
281 }
282
283 /*
284 * How many more buffers is this backend allowed?
285 *
286 * Forwarded buffers are already pinned and map to the leading blocks of
287 * the pending read (the remaining portion of an earlier short read that
288 * we're about to continue). They are not counted in pinned_buffers, but
289 * they are counted as pins already held by this backend according to the
290 * buffer manager, so they must be added to the limit it grants us.
291 */
292 if (stream->temporary)
294 else
295 buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
297
298 buffer_limit += stream->forwarded_buffers;
299 buffer_limit = Min(buffer_limit, PG_INT16_MAX);
300
301 if (buffer_limit == 0 && stream->pinned_buffers == 0)
302 buffer_limit = 1; /* guarantee progress */
303
304 /* Does the per-backend limit affect this read? */
305 nblocks = stream->pending_read_nblocks;
306 if (buffer_limit < nblocks)
307 {
308 int16 new_distance;
309
310 /* Shrink distance: no more look-ahead until buffers are released. */
311 new_distance = stream->pinned_buffers + buffer_limit;
312 if (stream->distance > new_distance)
313 stream->distance = new_distance;
314
315 /* Unless we have nothing to give the consumer, stop here. */
316 if (stream->pinned_buffers > 0)
317 return false;
318
319 /* A short read is required to make progress. */
320 nblocks = buffer_limit;
321 }
322
323 /*
324 * We say how many blocks we want to read, but it may be smaller on return
325 * if the buffer manager decides to shorten the read. Initialize buffers
326 * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
327 * and keep the original nblocks number so we can check for forwarded
328 * buffers as output, below.
329 */
330 buffer_index = stream->next_buffer_index;
331 io_index = stream->next_io_index;
332 while (stream->initialized_buffers < buffer_index + nblocks)
333 stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
334 requested_nblocks = nblocks;
335 need_wait = StartReadBuffers(&stream->ios[io_index].op,
336 &stream->buffers[buffer_index],
337 stream->pending_read_blocknum,
338 &nblocks,
339 flags);
340 stream->pinned_buffers += nblocks;
341
342 /* Remember whether we need to wait before returning this buffer. */
343 if (!need_wait)
344 {
345 /* Look-ahead distance decays, no I/O necessary. */
346 if (stream->distance > 1)
347 stream->distance--;
348 }
349 else
350 {
351 /*
352 * Remember to call WaitReadBuffers() before returning head buffer.
353 * Look-ahead distance will be adjusted after waiting.
354 */
355 stream->ios[io_index].buffer_index = buffer_index;
356 if (++stream->next_io_index == stream->max_ios)
357 stream->next_io_index = 0;
358 Assert(stream->ios_in_progress < stream->max_ios);
359 stream->ios_in_progress++;
360 stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
361 }
362
363 /*
364 * How many pins were acquired but forwarded to the next call? These need
365 * to be passed to the next StartReadBuffers() call by leaving them
366 * exactly where they are in the queue, or released if the stream ends
367 * early. We need the number for accounting purposes, since they are not
368 * counted in stream->pinned_buffers but we already hold them.
369 */
370 forwarded = 0;
371 while (nblocks + forwarded < requested_nblocks &&
372 stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
373 forwarded++;
374 stream->forwarded_buffers = forwarded;
375
376 /*
377 * We gave a contiguous range of buffer space to StartReadBuffers(), but
378 * we want it to wrap around at queue_size. Copy overflowing buffers to
379 * the front of the array where they'll be consumed, but also leave a copy
380 * in the overflow zone which the I/O operation has a pointer to (it needs
381 * a contiguous array). Both copies will be cleared when the buffers are
382 * handed to the consumer.
383 */
384 overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
385 if (overflow > 0)
386 {
387 Assert(overflow < stream->queue_size); /* can't overlap */
388 memcpy(&stream->buffers[0],
389 &stream->buffers[stream->queue_size],
390 sizeof(stream->buffers[0]) * overflow);
391 }
392
393 /* Compute location of start of next read, without using % operator. */
394 buffer_index += nblocks;
395 if (buffer_index >= stream->queue_size)
396 buffer_index -= stream->queue_size;
397 Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
398 stream->next_buffer_index = buffer_index;
399
400 /* Adjust the pending read to cover the remaining portion, if any. */
401 stream->pending_read_blocknum += nblocks;
402 stream->pending_read_nblocks -= nblocks;
403
404 return true;
405}
bool StartReadBuffers(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, int flags)
Definition: bufmgr.c:1494
uint32 GetAdditionalPinLimit(void)
Definition: bufmgr.c:2522
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
uint32 GetAdditionalLocalPinLimit(void)
Definition: localbuf.c:314

References ReadStream::advice_enabled, Assert(), InProgressIO::buffer_index, ReadStream::buffers, ReadStream::distance, ReadStream::forwarded_buffers, GetAdditionalLocalPinLimit(), GetAdditionalPinLimit(), if(), ReadStream::initialized_buffers, InvalidBlockNumber, InvalidBuffer, ReadStream::io_combine_limit, ReadStream::ios, ReadStream::ios_in_progress, ReadStream::max_ios, ReadStream::max_pinned_buffers, Min, ReadStream::next_buffer_index, ReadStream::next_io_index, ReadStream::oldest_buffer_index, InProgressIO::op, ReadStream::pending_read_blocknum, ReadStream::pending_read_nblocks, PG_INT16_MAX, ReadStream::pinned_buffers, ReadStream::queue_size, ReadStream::read_buffers_flags, READ_BUFFERS_ISSUE_ADVICE, ReadStream::seq_blocknum, ReadStream::seq_until_processed, StartReadBuffers(), and ReadStream::temporary.

Referenced by read_stream_look_ahead().

◆ read_stream_unget_block()

static void read_stream_unget_block ( ReadStream stream,
BlockNumber  blocknum 
)
inlinestatic

Definition at line 211 of file read_stream.c.

212{
213 /* We shouldn't ever unget more than one block. */
215 Assert(blocknum != InvalidBlockNumber);
216 stream->buffered_blocknum = blocknum;
217}

References Assert(), ReadStream::buffered_blocknum, and InvalidBlockNumber.

Referenced by read_stream_look_ahead().